95 lines
3.8 KiB
Python
95 lines
3.8 KiB
Python
|
|
import logging
|
||
|
|
import re
|
||
|
|
|
||
|
|
import ldap
|
||
|
|
from django.contrib.auth.models import Group, User
|
||
|
|
|
||
|
|
from django_proxmox_mikrotik.configs import AuthLDAPConfig
|
||
|
|
|
||
|
|
groupmember_re = re.compile('^uid=([^,]+),')
|
||
|
|
|
||
|
|
|
||
|
|
class Ldap:
|
||
|
|
possible_groups = ['root', 'intern', 'extern']
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.initialize()
|
||
|
|
|
||
|
|
def initialize(self):
|
||
|
|
self.conn = ldap.initialize(AuthLDAPConfig.HOST)
|
||
|
|
self.conn.simple_bind_s(AuthLDAPConfig.BIND_DN, AuthLDAPConfig.BIND_PASSWORD)
|
||
|
|
|
||
|
|
def __enter__(self):
|
||
|
|
self.initialize()
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __exit__(self, *args):
|
||
|
|
self.conn.unbind_s()
|
||
|
|
|
||
|
|
def search(self, base, filterstr='(objectClass=*)', attrlist=None):
|
||
|
|
logging.debug(f'LDAP to {base} with filter {filterstr} - {attrlist if attrlist else "all attributes"}')
|
||
|
|
return self.conn.search_s(base, ldap.SCOPE_SUBTREE, filterstr, attrlist)
|
||
|
|
|
||
|
|
def __getattr__(self, item):
|
||
|
|
if hasattr(ldap, item):
|
||
|
|
return getattr(ldap, item)
|
||
|
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'")
|
||
|
|
|
||
|
|
def get_groups(self, filterstr='(objectClass=*)', attrlist=None):
|
||
|
|
return self.search(AuthLDAPConfig.GROUP_SEARCH_BASE, filterstr, attrlist)
|
||
|
|
|
||
|
|
def get_users(self, filterstr='(objectClass=inetOrgPerson', attrlist=None):
|
||
|
|
return self.search(AuthLDAPConfig.USER_BASE, filterstr, attrlist)
|
||
|
|
|
||
|
|
def get_user(self, username):
|
||
|
|
if userdata := self.get_users(f'(uid={username})'):
|
||
|
|
return {k: (v[0].decode('utf-8') if v else None) for k, v in userdata[0][1].items()}
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def get_user_groups(self, username, attrlist=None):
|
||
|
|
filterstr = (f'(&'
|
||
|
|
f'(objectClass=groupOfNames)'
|
||
|
|
f'(member=uid={username},{AuthLDAPConfig.USER_BASE})'
|
||
|
|
f')')
|
||
|
|
grps = self.get_groups(filterstr, attrlist)
|
||
|
|
return [data.get('ou')[0].decode('utf-8') for dn, data in grps]
|
||
|
|
|
||
|
|
def get_group_members(self, groupname, attrlist=None):
|
||
|
|
if found := self.search(f'ou={groupname},{AuthLDAPConfig.GROUP_SEARCH_BASE}',
|
||
|
|
'(objectClass=groupOfNames)', attrlist=['member']):
|
||
|
|
return [groupmember_re.sub(r'\1', m.decode('utf-8')) for m in found[0][1].get('member')]
|
||
|
|
return []
|
||
|
|
|
||
|
|
def create_initial_groups(self):
|
||
|
|
return [
|
||
|
|
Group.objects.get_or_create(name=name)[0] for name in self.possible_groups
|
||
|
|
]
|
||
|
|
|
||
|
|
def set_user_groups(self, userinstance: User, save_instance=True):
|
||
|
|
"""This does NOT save the user instance!"""
|
||
|
|
root_group, intern_group, extern_group = self.create_initial_groups()
|
||
|
|
try:
|
||
|
|
ldap_user = self.get_user(userinstance.username)
|
||
|
|
if ldap_user:
|
||
|
|
logging.debug(f'LDAP-User found: {ldap_user}')
|
||
|
|
groups = self.get_user_groups(userinstance.username)
|
||
|
|
if 'root' in groups and (userinstance.is_superuser is False or userinstance.is_staff is False):
|
||
|
|
logging.debug(f'LDAP-User is root: {ldap_user}')
|
||
|
|
userinstance.groups.add(root_group)
|
||
|
|
userinstance.is_superuser = True
|
||
|
|
userinstance.is_staff = True
|
||
|
|
elif 'intern' in groups and userinstance.is_staff is False:
|
||
|
|
logging.debug(f'LDAP-User is intern: {ldap_user}')
|
||
|
|
userinstance.groups.add(intern_group)
|
||
|
|
userinstance.is_staff = True
|
||
|
|
elif 'extern' in groups:
|
||
|
|
logging.debug(f'LDAP-User is extern: {ldap_user}')
|
||
|
|
userinstance.groups.add(extern_group)
|
||
|
|
else:
|
||
|
|
raise Exception(f'LDAP-User not found: {userinstance.username}')
|
||
|
|
except Exception as e:
|
||
|
|
logging.error(f"LDAP-Fehler: {e}")
|
||
|
|
raise e
|
||
|
|
if save_instance:
|
||
|
|
userinstance.save()
|