Files
Django-Proxmox-Mikrotik/lib/mikrotik.py
Holger Sielaff 90c0ff61ed initial
2025-08-27 09:55:55 +02:00

380 lines
13 KiB
Python

import collections
import logging
from functools import cached_property
import routeros_api
from django.db import models
from django.db.utils import IntegrityError
from django.forms import model_to_dict
from django.forms.models import ValidationError
from routeros_api.api_structure import StringField
from routeros_api.exceptions import RouterOsApiCommunicationError
from django_proxmox_mikrotik.settings import MikrotikConfig
from lib import FactoryMixin
from lib.db import BOOLEAN_CHOICES_CHAR
from lib.router_abstract import RoutedModelAbstract, RouterAbstract
ip_8 = MikrotikConfig.IP_8
_logger = logging.getLogger(__name__)
def is_local_ip(ip):
return ip[:3] in ip_8
class MikrotikApi(FactoryMixin):
def __init__(self):
# Create fresh connection for each instance - no shared state
self.connection = None
self.api = None
self._connect()
def _connect(self):
"""Create a new connection and API instance"""
try:
self.connection = routeros_api.RouterOsApiPool(
MikrotikConfig.HOST,
username=MikrotikConfig.USER,
password=MikrotikConfig.PASS,
port=8728,
plaintext_login=True,
use_ssl=False,
ssl_verify=True,
ssl_verify_hostname=True,
ssl_context=None,
)
self.api = self.connection.get_api()
except Exception as e:
_logger.error(f"Failed to create Mikrotik connection: {e}")
self.connection = None
self.api = None
raise
def disconnect(self):
"""Safely disconnect the connection"""
if self.connection:
try:
self.connection.disconnect()
except (OSError, AttributeError, BrokenPipeError) as e:
_logger.debug(f"Error during disconnect (expected): {e}")
finally:
self.connection = None
self.api = None
@property
def _default_structure(self):
return collections.defaultdict(lambda: StringField(encoding='windows-1250'))
def resource(self, route):
if not self.api:
raise ConnectionError("No active Mikrotik connection")
return self.api.get_resource(route, self._default_structure)
class Mikrotik(RouterAbstract):
_instances = {}
class _resource:
def __init__(self, route):
self._route = route
def __getattr__(self, item):
"""Dynamic method creation for RouterOS API calls"""
def method_wrapper(*args, **kwargs):
api = None
max_retries = 3
for attempt in range(max_retries):
try:
api = MikrotikApi()
resource = api.resource(self._route)
method = getattr(resource, item)
result = method(*args, **kwargs)
return result
except (OSError, BrokenPipeError, ConnectionError, AttributeError) as e:
_logger.warning(f"Connection error in _resource.{item}() attempt {attempt + 1}/{max_retries}: {e}")
if attempt == max_retries - 1:
raise
finally:
if api:
api.disconnect()
return method_wrapper
""""
def call(self, *args, **kwargs):
api = MikrotikApi()
try:
resource = api.resource(self._route)
return resource.call(*args, **kwargs)
finally:
api.connection.disconnect()
"""
def __init__(self, route):
self._route = route
@staticmethod
def pool(route):
if isinstance(route, MikrotikModelMixin):
route = route.router_base
return Mikrotik._instances.setdefault(route, Mikrotik(route))
def initialize(self, route):
if isinstance(route, MikrotikModelMixin):
self._route = route.router_base
else:
self._route = route
Mikrotik._instances.setdefault(route, self)
def get(self, **kwargs):
mikrotik_kwargs = {}
additional_queries = []
for k in list(kwargs.keys()):
if '__' in k:
v = kwargs.pop(k)
field, lookup = k.split('__', 1)
if lookup == 'startswith':
# Mikrotik verwendet ~"^pattern" für startswith
mikrotik_kwargs[field] = f'^{v}'
elif lookup == 'contains':
# Mikrotik verwendet ~"pattern" für contains
mikrotik_kwargs[field] = f'{v}'
elif lookup == 'endswith':
# Mikrotik verwendet ~"pattern$" für endswith
mikrotik_kwargs[field] = f'{v}$'
else:
# Unbekannter Lookup-Typ, behalte den ursprünglichen Wert bei
kwargs[k] = v
_logger.debug(f'Getting {self._route} with transformed kwargs: {mikrotik_kwargs}')
for field, pattern in mikrotik_kwargs.items():
additional_queries.append(f'{field}~"{pattern}"')
logging.info(f'Getting {self._route}/print with kwargs: {kwargs} and additional queries: {additional_queries}')
return self._resource(self._route).call('print', queries=kwargs, additional_queries=additional_queries)
def set(self, **kwargs):
assert 'id' in kwargs, "id must be set"
if MikrotikConfig.READONLY:
_logger.warning(f'Trying to set {self._route} to {kwargs} on read-only router')
return True # Simulate success in readonly mode
else:
return self._resource(self._route).set(**kwargs)
def add(self, **kwargs):
kwargs.pop('id', None)
if MikrotikConfig.READONLY:
_logger.warning(f'Trying to add {self._route} to {kwargs} on read-only router')
return '*READONLY' # Simulate success with fake ID in readonly mode
else:
return self._resource(self._route).add(**kwargs)
def remove(self, **kwargs):
assert 'id' in kwargs, "id must be set"
if MikrotikConfig.READONLY:
_logger.warning(f'Trying to remove {self._route} with {kwargs} on read-only router')
return True # Simulate success in readonly mode
else:
return self._resource(self._route).remove(id=kwargs['id'])
class MikrotikModelMixin(RoutedModelAbstract):
class Meta:
abstract = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._old_values = self.to_json
internal_id = models.BigAutoField(primary_key=True)
disabled = models.CharField(max_length=10, null=True, blank=True, default='false', choices=BOOLEAN_CHOICES_CHAR)
comment = models.TextField(null=True, blank=True, default='')
"""
Those we need for configuration
"""
@property
def router_base(self):
raise NotImplemented('Not implemented')
@property
def router_object_without_id(self):
_logger.warning(f'Deprecated - use router_object instead of router_object_without_id for {self}')
return self.router_object()
"""
Common stuff - may be overwritten
"""
@property
def no_mikrotik_props(self):
"""Props, that are not send to router
"""
return ['internal_id', 'dynamic']
@property
def router(self):
return Mikrotik.pool(self)
@classmethod
def class_props(cls):
return [p for p in model_to_dict(cls()).keys() if p not in ('internal_id', 'dynamic')]
@classmethod
def get_all_as_object(cls):
all = cls().router_get_all
print(all, type(all))
return [cls.from_dict(**r) for r in all]
@classmethod
def translate_keys(cls, **kwargs):
return {k.replace('-', '_'): v for k, v in kwargs.items()}
def router_object(self, translate_keys=True):
if self.id:
args = {'id': self.id}
else:
args = {}
for k in self.unique_on_router:
# here, we take the first one set
if isinstance(k, (tuple, list)):
for k2 in k:
if v2 := getattr(self, k2, None):
args[k2] = v2
break
else:
if v := getattr(self, k, None):
args[k] = v
if not args:
raise ValueError(f"Empty args to get info from router for {self}")
data = self.router.get(**args)
if data:
return self.translate_keys(**data[0]) if translate_keys else data[0]
return None
def sync_from_router(self, data=None):
if data := data or self.router_object():
_logger.debug(f'Syncing {self} from router with {data}')
self.assign(**data)
else:
_logger.debug(f'Could not sync {self} from router')
return self
def assign(self, **kwargs):
updatefields = []
for k, v in kwargs.items():
if hasattr(self, k):
if v != getattr(self, k):
updatefields.append(k)
setattr(self, k, v)
return self
def sync_all_from_router(self):
for obj in self.router_get_all:
self.from_dict(**obj)
def delete_from_router(self):
if self.id:
return self.router.remove(id=self.id)
return True
@classmethod
def from_dict(cls, **kwargs):
self_props = cls.class_props()
args = {}
for k, v in cls.translate_keys(**kwargs).items():
if k not in self_props:
_logger.warning(f'Unknown property {k} for {cls.__class__.__name__}')
else:
args[k] = v
try:
obj = cls.objects.get(id=args['id'])
_logger.debug(f'Found {obj} from {kwargs}')
except cls.DoesNotExist:
obj = cls.objects.create(**args)
_logger.debug(f'Created {obj} from {kwargs}')
except Exception as e:
_logger.error(f'Could not create {cls.__class__.__name__} from {kwargs} - {e}')
raise e
return obj
@property
def mikrotik_send_params(self):
return {k: v for k, v in self.to_json.items() if k not in self.no_mikrotik_props}
def sync_to_router(self, created=False):
data = self.mikrotik_send_params
_logger.debug(f'Syncing {self.__dict__}')
if self.id:
_logger.debug(f'Syncing {self} to router with {data}')
return self.router_set(**data)
_logger.debug(f'Adding {self} to router with {data}')
return self.router_add(**data)
@cached_property
def router_get_all(self):
return self.router.get()
def router_get(self, **kwargs):
response = self.router.get(**kwargs)
_logger.debug(f'Got {self} from router with {response}')
return response
def router_set(self, **kwargs):
kwargs['id'] = self.id
response = self.router.set(**kwargs)
_logger.debug(f'Set {self} to router with {response}')
return response
def router_add(self, **kwargs):
if self.id:
_logger.warning(f'Trying to add {self} to router - already has id {self.id}')
return True
kwargs.pop('id', None)
try:
response = self.router.add(**kwargs)
except RouterOsApiCommunicationError as e:
_logger.error(f'Could not add {self} to router - {e}')
routerdata = self.router_object()
if routerdata:
return self.sync_from_router(data=routerdata)
raise ValidationError(f'Could not add {self} to router - {e}')
try:
new_on_router = self.router_object()
_logger.debug(f'Got {new_on_router} from router')
self.id = new_on_router['id']
_logger.debug(f'Added {self} to router with {response}')
self.save()
except (IndexError, KeyError, NotImplementedError) as e:
_logger.info(f'Could not set id for {self} - git no id {e}')
return response
def sync_from_mikrotik(classname):
_st = classname()
for i in _st.router_get_all:
i = {k.replace('-', '_'): v for k, v in i.items()}
try:
existing = classname.objects.get(id=i['id'])
for k, v in i.items():
if hasattr(existing, k):
_logger.debug(f'Updating {k} to {v} - {existing}')
setattr(existing, k, v)
existing.save()
except classname.DoesNotExist:
_logger.info(f'Creating {i["id"]}')
try:
classname.objects.create(**i)
except IntegrityError as e:
_logger.error(f'Could not create {i["id"]}, already exists')
_logger.exception(e)