initial
This commit is contained in:
94
lib/ldap.py
Normal file
94
lib/ldap.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
import ldap
|
||||
from django.contrib.auth.models import Group, User
|
||||
|
||||
from django_proxmox_mikrotik.configs import AuthLDAPConfig
|
||||
|
||||
groupmember_re = re.compile('^uid=([^,]+),')
|
||||
|
||||
|
||||
class Ldap:
|
||||
possible_groups = ['root', 'intern', 'extern']
|
||||
|
||||
def __init__(self):
|
||||
self.initialize()
|
||||
|
||||
def initialize(self):
|
||||
self.conn = ldap.initialize(AuthLDAPConfig.HOST)
|
||||
self.conn.simple_bind_s(AuthLDAPConfig.BIND_DN, AuthLDAPConfig.BIND_PASSWORD)
|
||||
|
||||
def __enter__(self):
|
||||
self.initialize()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.conn.unbind_s()
|
||||
|
||||
def search(self, base, filterstr='(objectClass=*)', attrlist=None):
|
||||
logging.debug(f'LDAP to {base} with filter {filterstr} - {attrlist if attrlist else "all attributes"}')
|
||||
return self.conn.search_s(base, ldap.SCOPE_SUBTREE, filterstr, attrlist)
|
||||
|
||||
def __getattr__(self, item):
|
||||
if hasattr(ldap, item):
|
||||
return getattr(ldap, item)
|
||||
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'")
|
||||
|
||||
def get_groups(self, filterstr='(objectClass=*)', attrlist=None):
|
||||
return self.search(AuthLDAPConfig.GROUP_SEARCH_BASE, filterstr, attrlist)
|
||||
|
||||
def get_users(self, filterstr='(objectClass=inetOrgPerson', attrlist=None):
|
||||
return self.search(AuthLDAPConfig.USER_BASE, filterstr, attrlist)
|
||||
|
||||
def get_user(self, username):
|
||||
if userdata := self.get_users(f'(uid={username})'):
|
||||
return {k: (v[0].decode('utf-8') if v else None) for k, v in userdata[0][1].items()}
|
||||
return {}
|
||||
|
||||
def get_user_groups(self, username, attrlist=None):
|
||||
filterstr = (f'(&'
|
||||
f'(objectClass=groupOfNames)'
|
||||
f'(member=uid={username},{AuthLDAPConfig.USER_BASE})'
|
||||
f')')
|
||||
grps = self.get_groups(filterstr, attrlist)
|
||||
return [data.get('ou')[0].decode('utf-8') for dn, data in grps]
|
||||
|
||||
def get_group_members(self, groupname, attrlist=None):
|
||||
if found := self.search(f'ou={groupname},{AuthLDAPConfig.GROUP_SEARCH_BASE}',
|
||||
'(objectClass=groupOfNames)', attrlist=['member']):
|
||||
return [groupmember_re.sub(r'\1', m.decode('utf-8')) for m in found[0][1].get('member')]
|
||||
return []
|
||||
|
||||
def create_initial_groups(self):
|
||||
return [
|
||||
Group.objects.get_or_create(name=name)[0] for name in self.possible_groups
|
||||
]
|
||||
|
||||
def set_user_groups(self, userinstance: User, save_instance=True):
|
||||
"""This does NOT save the user instance!"""
|
||||
root_group, intern_group, extern_group = self.create_initial_groups()
|
||||
try:
|
||||
ldap_user = self.get_user(userinstance.username)
|
||||
if ldap_user:
|
||||
logging.debug(f'LDAP-User found: {ldap_user}')
|
||||
groups = self.get_user_groups(userinstance.username)
|
||||
if 'root' in groups and (userinstance.is_superuser is False or userinstance.is_staff is False):
|
||||
logging.debug(f'LDAP-User is root: {ldap_user}')
|
||||
userinstance.groups.add(root_group)
|
||||
userinstance.is_superuser = True
|
||||
userinstance.is_staff = True
|
||||
elif 'intern' in groups and userinstance.is_staff is False:
|
||||
logging.debug(f'LDAP-User is intern: {ldap_user}')
|
||||
userinstance.groups.add(intern_group)
|
||||
userinstance.is_staff = True
|
||||
elif 'extern' in groups:
|
||||
logging.debug(f'LDAP-User is extern: {ldap_user}')
|
||||
userinstance.groups.add(extern_group)
|
||||
else:
|
||||
raise Exception(f'LDAP-User not found: {userinstance.username}')
|
||||
except Exception as e:
|
||||
logging.error(f"LDAP-Fehler: {e}")
|
||||
raise e
|
||||
if save_instance:
|
||||
userinstance.save()
|
||||
Reference in New Issue
Block a user