# -*- coding: utf-8 -*- """Adds the ldap_partner_sync functionality to res.partner We want Flectra too """ import logging from ldap3 import MODIFY_REPLACE, SUBTREE from ldap3.core.exceptions import LDAPException try: from odoo import models, fields from odoo.exceptions import UserError except ModuleNotFoundError: from flectra import models, fields from flectra.exceptions import UserError _logger = logging.getLogger(__name__) from ..lib.ldap import get_ldap_connection from ..lib import get_config class ResPartner(models.Model): _inherit = 'res.partner' def write(self, vals): """Überschreibe write um LDAP-Sync mit Firmenwechsel-Handling hinzuzufügen""" if not self._global_sync_on(): return super().write(vals) old_parents = {record.id: record.parent_id.id for record in self if record.type == 'contact'} result = super().write(vals) with get_ldap_connection(self.env) as conn: # Stelle sicher, dass 'Keine Firma' OU existiert self._ensure_no_company_ou_exists(conn) if 'parent_id' in vals: for record in self: if record.type != 'contact': continue if old_parents[record.id] != record.parent_id.id: self._handle_company_change(record, old_parents[record.id]) else: for record in self: if record.type != 'contact': continue record._sync_to_ldap() return result def unlink(self): """Überschreibe unlink um Kontakte bei Firmenlöschung zu verschieben """ if not self._global_sync_on(): return super().unlink() with get_ldap_connection(self.env) as conn: for record in self: if record.type != 'contact': continue try: if record.is_company: # Wenn eine Firma gelöscht wird, verschiebe erst alle Kontakte record._move_contacts_to_no_company(conn) # Lösche den eigentlichen Eintrag dn = record._get_ldap_dn() if conn.delete(dn): _logger.debug(f"Deleted LDAP entry: {dn}") else: _logger.warning(f"Failed to delete LDAP entry: {conn.result}") except LDAPException as e: _logger.error(f"Error during deletion of {dn}: {str(e)}") raise UserError(f"LDAP Delete Error: {str(e)}") return super().unlink() def _global_sync_on(self): return get_config(self.env, 'partner_sync_on', False) def _get_user_posix_data(self): self.ensure_one() def get_user_password(user_id): self.env.cr.execute("SELECT password FROM res_users WHERE id=%s", [user_id]) res = self.env.cr.dictfetchone() if res and res.get('password'): return res.get('password') return '' if self.user_ids: _logger.error([user.password + str(user) for user in self.user_ids]) return { 'objectClass': ['posixAccount'], # , 'ldapPublicKey'], 'userPassword': [get_user_password(user.id) for user in self.user_ids], 'uid': [user.login for user in self.user_ids], 'loginShell': [user.posix_login_shell or '/usr/bin/nologin' for user in self.user_ids], 'homeDirectory': [user.posix_home_directory or f'/tmp/{user.posix_uid}' for user in self.user_ids], 'uidNumber': self.id + 1000, 'gidNumber': self.id + 1000, # 'sshPublicKey': self.ssh_public_key, } return {} def _get_no_company_dn(self, with_base=False): """Gibt den DN für den 'Keine Firma' ldap_part zurück Wenn mit with_base den ganzen DN""" prefix = get_config(self.env, 'company_prefix', 'company') ldap_base = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company') if with_base: return f"ou={prefix}-0,{ldap_base}" return f"ou={prefix}-0" def _ensure_no_company_ou_exists(self, conn): """Stellt sicher, dass die 'Keine Firma' existiert """ no_company_dn = self._get_no_company_dn(with_base=True) ldap_base = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company') try: # Prüfe ob DN bereits existiert conn.search(ldap_base, f'({no_company_dn.split(",")[0]})', SUBTREE, attributes=['*']) if not conn.entries: # Erstelle 'Keine Firma' falls sie nicht existiert attrs = { 'objectClass': ['top', 'organizationalUnit'], 'ou': 'Keine Firma', 'description': 'Automatisch erstellt für Kontakte ohne Firmenzugehörigkeit' } if conn.add(no_company_dn, attrs['objectClass'], attrs): _logger.debug(f"Created 'Keine Firma' with dn:{no_company_dn}") else: _logger.error(f"Failed to create 'Keine Firma' - dn:{no_company_dn} - attrs:{attrs} - response:{conn.result}") raise UserError(f"Did not create 'Keine Firma' with dn:{no_company_dn}") except LDAPException as e: _logger.error(f"Error ensuring 'Keine Firma': {str(e)}") raise UserError(f"LDAP Error: {str(e)}") return no_company_dn def _get_own_dn_part(self): self.ensure_one() if self.is_company: prefix = get_config(self.env, 'company_prefix', 'company') attr_ = 'ou' else: prefix = get_config(self.env, 'person_prefix', 'customer') attr_ = 'dc' contact_prop = get_config(self.env, 'contact_prop', 'id') prop_ = getattr(self, contact_prop) return f"{attr_}={prefix}-{prop_}" def _get_ldap_dn(self): """Generiert den LDAP DN für den Kontakt """ base_dn = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company') dn = [self._get_own_dn_part()] if self.is_company: pass elif self.parent_id and self.parent_id.is_company: dn.append(self.parent_id._get_own_dn_part()) else: dn.append(self._get_no_company_dn(with_base=False)) dn.append(base_dn) return ','.join(dn) def _move_contacts_to_no_company(self, conn): """Verschiebt alle Kontakte einer zu löschenden Firma unter 'Keine Firma'""" self.ensure_one() # Stelle sicher, dass wir nur eine Firma behandeln if not self.is_company: return # Stelle sicher, dass 'Keine Firma' existiert no_company_dn = self._ensure_no_company_ou_exists(conn) # Suche alle Kontakte unter der Firma company_dn = self._get_ldap_dn() conn.search(company_dn, '(objectClass=inetOrgPerson)', SUBTREE, attributes=['*']) for entry in conn.entries: old_dn = entry.entry_dn new_dn = f"dc={entry.dc.value},{no_company_dn}" try: # Verschiebe den Kontakt unter 'Keine Firma' if conn.modify_dn(old_dn, f"dc={entry.dc.value}", new_superior=no_company_dn): _logger.info(f"Moved contact from {old_dn} to {new_dn}") else: _logger.error(f"Failed to move contact: {conn.result}") except LDAPException as e: _logger.error(f"Error moving contact {old_dn}: {str(e)}") def _prepare_ldap_data(self, new_company=False): """Bereitet die LDAP-Attribute vor """ data = {'cn': self.name or 'Unknown N.N.'} city = self.city zip = self.zip country = self.country_id.code if self.country_id else '' street = [self.street, self.street2] phone = self.phone.replace('+', '00') if self.phone else '' if self.parent_id and new_company: city = self.parent_id.city or self.city phone = (self.parent_id.phone.replace('+', '00') if self.parent_id.phone else '') or self.phone country = self.parent_id.country_id.code if self.parent_id.country_id else country street = [self.parent_id.street or street[0], self.parent_id.street2 or street[1]] if self.is_company: data.update({ 'objectClass': ['top', 'organizationalUnit', 'extensibleObject'], }) else: posix_data = self._get_user_posix_data() if posix_data: posix_class = posix_data.pop('objectClass') else: posix_class = [] data.update({ 'objectClass': ['top', 'inetOrgPerson', 'person', 'extensibleObject'] + posix_class, 'sn': ' '.join((self.name or '').split(' ')[1:]).strip() or 'N.N.', 'givenName': self.name.split()[0] if self.name else 'Unknown', }) if posix_data: data.update(posix_data) if self.parent_id: city = city or self.parent_id.city zip = zip or self.parent_id.zip country = country or (self.parent_id.country_id.code if self.parent_id.country_id else '') street = [street[0] or self.parent_id.street, self.street2 or self.parent_id.street2] phone = phone or (self.parent_id.phone.replace('+', '00') if self.parent_id.phone else '') if self.email: data['mail'] = self._assign_child_to_ldap_values('email') if self.mobile: data['mobile'] = self._assign_child_to_ldap_values('mobile', lambda x: x.replace('+', '00')) if phone: data['telephoneNumber'] = self._assign_child_to_ldap_values('phone', lambda x: x.replace('+', '00'), value=phone) if country: # data['c'] = self._assign_child_to_ldap_values('country_id.code', value=country) data['c'] = country if zip: # data['postalCode'] = self._assign_child_to_ldap_values('zip', value=zip) data['postalCode'] = zip if city: # data['localityName'] = self._assign_child_to_ldap_values('city', value=city) data['localityName'] = city street = filter(None, street) if street: data['postalAddress'] = ', '.join(street) return data def _assign_child_to_ldap_values(self, dbprop: str, callback=None, value=None): vals = [value or getattr(self, dbprop)] # Do not assign all the employees to a company if self.is_company: return vals if self.child_ids: for child in self.child_ids: if getattr(child, dbprop): vals.append(getattr(child, dbprop)) vals = list(filter(None, vals)) if vals and callback: vals = list(map(callback, vals)) return vals def _change_values(self, attrs=None): """Makes modify data out of attrs """ attrs = attrs or self._prepare_ldap_data() return { attr: [(MODIFY_REPLACE, [value] if not isinstance(value, list) else value)] for attr, value in attrs.items() if attr != 'objectClass' } def _exists_in_ldap(self, conn, dn=None, filter=None, _conn=None, record=None): s = record or self dn = dn or s._get_ldap_dn() filter = filter or f"({dn.split(',')[0]})" conn.search(get_config(s.env, 'ldap_base'), filter, SUBTREE) return bool(conn.entries) def _ldap_add(self, conn, dn=None, attrs=None, _conn=None, record=None): s = record or self dn = dn or s._get_ldap_dn() typ = 'Company' if s.is_company else 'Contact' attrs = attrs or s._prepare_ldap_data() if conn.add(dn, attrs.pop('objectClass'), attrs): _logger.debug(f"{typ} created: {dn}") return attrs else: _logger.error(f"Failed to create {typ} dn:{dn} - {conn.result}") return False def _ldap_modify(self, conn, dn=None, attrs=None, changes=None, record=None): s = record or self dn = dn or s._get_ldap_dn() typ = 'Company' if s.is_company else 'Contact' attrs = attrs or s._prepare_ldap_data() changes = changes or s._change_values(attrs) if changes and conn.modify(dn, changes): _logger.debug(f"{typ} DN updated: {dn}") return changes else: _logger.error(f"Failed to update {typ} dn:{dn} - {conn.result}") return False def _sync_company(self, conn, dn=None): """Synchronisiert eine Firma (OU) mit LDAP """ self.ensure_one() try: # Prüfe ob OU existiert if not self._exists_in_ldap(conn, dn=dn): self._ldap_add(conn, dn=dn) else: # Erstelle neue ON self._ldap_add(conn, dn=dn) except LDAPException as e: _logger.error(f"Error syncing company {self.name}: {str(e)}") raise UserError(f"LDAP Sync Error: {str(e)}") def _sync_contact(self, conn, dn=None): """Synchronisiert einen Kontakt mit LDAP """ try: if self._exists_in_ldap(conn, dn=dn): self._ldap_modify(conn, dn=dn) else: self._ldap_add(conn, dn=dn) except LDAPException as e: _logger.error(f"Error syncing contact {self.name}: {str(e)}") raise UserError(f"LDAP Sync Error: {str(e)}") def _sync_to_ldap(self): """Synchronisiert den Kontakt mit LDAP""" self.ensure_one() with get_ldap_connection(self.env) as conn: if self.is_company: self._sync_company(conn) else: # Für normale Kontakte if not self.parent_id: self._ensure_no_company_ou_exists(conn) elif self.parent_id.is_company: self.parent_id._sync_company(conn) self._sync_contact(conn) def _handle_company_change(self, record, old_parent_id): """Behandelt den Wechsel eines Kontakts zu einer anderen Firma""" with get_ldap_connection(self.env) as conn: # Stelle sicher dass "Keine Firma" OU existiert no_company_dn = self._ensure_no_company_ou_exists(conn) dn_part = record._get_own_dn_part() # Bestimme den alten DN basierend auf der vorherigen Situation if old_parent_id: # Wechsel von einer konkreten Firma old_company = self.env['res.partner'].browse(old_parent_id) old_dn = f"{dn_part},{old_company._get_ldap_dn()}" else: # Wechsel von "Keine Firma" old_dn = f"{dn_part},{no_company_dn}" # Bestimme den neuen DN basierend auf der neuen Situation if record.parent_id: # Wechsel zu einer konkreten Firma new_superior = record.parent_id._get_ldap_dn() else: # Wechsel zu "Keine Firma" new_superior = no_company_dn try: # Prüfe ob der alte Eintrag existiert conn.search(','.join(old_dn.split(' ')[1:]), f'({dn_part})', attributes=['*']) if not conn.entries: _logger.warning(f"Old entry not found: {old_dn}") # Wenn der alte Eintrag nicht existiert, erstelle einfach einen neuen record._sync_contact(conn) return # Führe die Verschiebung durch if conn.modify_dn(old_dn, f"{dn_part}", new_superior=new_superior): _logger.info(f"Moved LDAP entry from {old_dn} to {record._get_ldap_dn()}") record._sync_to_ldap() else: _logger.error(f"Failed to move LDAP entry: {conn.result}") if conn.result['description'].lower() != 'entryalreadyexists': raise UserError(f"LDAP Move Error:{old_dn} {record._get_own_dn_part()} {new_superior} {conn.result['description']}") else: conn.delete(old_dn) except LDAPException as e: _logger.error(f"Error during company change for {record.name}: {str(e)}") raise UserError(f"LDAP Company Change Error: {str(e)}")