import json import logging import time import proxmoxer from django_proxmox_mikrotik.settings import ProxmoxConfig def get_comma_separated_values(value): _vlist = [v.strip().split('=', 1) for v in value.split(',') if '=' in v] if value else [] return {k: v for k, v in _vlist} class PMDict(dict): def __getattr__(self, name): if name in self: return self[name] raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") class PMCollection(list): def append(self, item): assert isinstance(item, dict) if not isinstance(item, PMDict): item = PMDict(**item) super().append(item) def __getattr__(self, item): if self and hasattr(self[0], item): for i in self: yield getattr(i, item) raise AttributeError(f"'{self.__class__.__name__}' object (or its content[0]) has no attribute '{item}'") class Proxmox: def __init__(self, node=ProxmoxConfig.NODE): self.initialize(node) def initialize(self, node=ProxmoxConfig.NODE, ): self.api = proxmoxer.ProxmoxAPI( ProxmoxConfig.HOST, user=ProxmoxConfig.USER, password=ProxmoxConfig.PASS, verify_ssl=False ) self.nodepath = f'nodes/{node}' return self def __enter__(self): return self.initialize() def __exit__(self, *args): """Actually, this is a no-op, just for the with statement :)""" return def nodes(self, route=''): return self.api(f'{self.nodepath}/{route.lstrip("/")}') def lxc(self, route=''): return self.nodes(f'lxc/{route.lstrip("/")}') def storage(self, route=''): return self.nodes(f'storage/{route.lstrip("/")}') def qemu(self, route=''): return self.nodes(f'qemu/{route.lstrip("/")}') def cluster(self, route=''): return self.api(f'cluster/{route.lstrip("/")}') @property def next_vmid(self): return int(self.cluster_get('nextid')) def __getattr__(self, name): """This makes a 'magic' trick We can call the proxmox api like this: * proxmox.lxc_115_config_get() * proxmox.lxc_115_get('config') * proxmox.lxc_get('115/config') * proxmox.lxc('115/config').get() * ... seems handy at the moment ... The first in *args will always be taken as route! """ if "_" in name: nameparts = name.split("_") action = nameparts.pop() if action not in ["get", "post", "put", "delete"]: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}' - 1") if hasattr(self, nameparts[0]): base_method = getattr(self, nameparts.pop(0)) else: base_method = self.nodes route_part = "/".join(nameparts) def wrapper(*args, **kwargs): if args: route = str(args[0]).rstrip('/') args = args[1:] else: route = '' if route_part: route = f'{route_part}/{route}' if ProxmoxConfig.READONLY and action != 'get': logging.warning(f'PROXMOX_READONLY is set - not calling {route} ' f'with method {base_method.__name__}.{action}' f'({args}, {kwargs})') # Return appropriate mock response based on action if action == 'post': return 'UPID:READONLY:00000000:00000000:00000000:vzcreate:readonly:root@pam:' elif action == 'put': return None elif action == 'delete': return None return {} logging.debug(f'Calling {base_method.__name__}.{action}({route}, {args}, {kwargs})') return getattr(base_method(route), action)(*args, **kwargs) return wrapper raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") @classmethod def get_task_status(cls, taskhash, sleeptime=10, pm=None): cls = pm or cls() logging.debug(f"Getting task status for {taskhash}") maxtime = ProxmoxConfig.CREATE_LXC_TIMEOUT response = cls.nodes_get(f'tasks/{taskhash}/status') logging.debug(f"Status response for {taskhash}: {response}") while True: response = cls.nodes_get(f'tasks/{taskhash}/status') logging.debug(f"Status response for {taskhash}: {response}") yield response logging.debug(f"Status response for {taskhash}: {response}") status = response['status'] if status == 'stopped': if not response.get('exitstatus') == 'OK': raise ValueError(f"Exitstatus is {response.get('exitstatus')}") break time.sleep(sleeptime) maxtime -= sleeptime if maxtime <= 0: raise TimeoutError("Took to long") def get_all_lxc(self, *filterargs, as_dict=True, **filterkwargs): logging.debug(f"Getting all LXC with filter {filterargs} and {filterkwargs}") from proxmox.models import Lxc lxc_filter = {} _raw = self.lxc_get() logging.debug(f"All LXC: {_raw}") if not _raw: return [] all = _raw comps = {} for key in list(_raw[0].keys()): if key in filterkwargs: comps[key] = filterkwargs.pop(key) for key, comp in comps.items(): all = filter(lambda x: ( logging.debug(f'{key} of lxc is {x[key]}, must be {comp}' if key in x else f"{key} not in {x}") or key not in x or x[key] == comp ), all) if not all: logging.debug(f"No LXC found with filter {filterargs} and {filterkwargs}") logging.debug(f"All LXC: {json.dumps(all, indent=2, default=str)}") return [] if filterargs: for prop, c, v in filterargs: invert = False if c.startswith('!'): invert = True if c.endswith('='): comparer = lambda x: x != v if invert else x == v elif c.endswith('in'): comparer = lambda x: x not in v if invert else x in v elif c.endswith('startswith'): comparer = lambda x: v.startswith(v) if invert else v.startswith(v) elif c.endswith('endswith'): comparer = lambda x: v.endswith(v) if invert else v.endswith(v) lxc_filter[prop] = comparer if filterkwargs: for k, v in filterkwargs.items(): lxc_filter[k] = lambda x: x == v def filter_out(lxc_): if not lxc_filter: return True for prop, comparer in lxc_filter.items(): if not prop in lxc_: continue if not comparer(lxc_.get(prop)): logging.debug(f"Filter out {lxc_} because {prop} is {lxc_.get(prop)}") return False return True ret = [] for lxc in filter(filter_out, all): lxc_config = self.lxc_get(f'{lxc["vmid"]}/config') _lx_data = lxc | lxc_config if as_dict: ret.append(PMDict(**_lx_data)) # yield PMDict(**_lx_data) else: ret.append(Lxc().from_proxmox(**_lx_data)) # yield Lxc().from_proxmox(**_lx_data) if not ret: logging.warning(f"Found no LXC with filter {filterargs} and {filterkwargs}") return ret