Files
Flectra-Odoo-LDAP-Partner-Sync/models/res_partner.py

412 lines
16 KiB
Python
Raw Normal View History

2025-06-26 08:41:56 +02:00
# -*- coding: utf-8 -*-
"""Adds the ldap_partner_sync functionality to res.partner
We want Flectra too
"""
import logging
from ldap3 import MODIFY_REPLACE, SUBTREE
from ldap3.core.exceptions import LDAPException
try:
from odoo import models, fields
from odoo.exceptions import UserError
except ModuleNotFoundError:
from flectra import models, fields
from flectra.exceptions import UserError
_logger = logging.getLogger(__name__)
from ..lib.ldap import get_ldap_connection
from ..lib import get_config
class ResPartner(models.Model):
_inherit = 'res.partner'
def write(self, vals):
"""Überschreibe write um LDAP-Sync mit Firmenwechsel-Handling hinzuzufügen"""
if not self._global_sync_on():
return super().write(vals)
old_parents = {record.id: record.parent_id.id for record in self if record.type == 'contact'}
result = super().write(vals)
with get_ldap_connection(self.env) as conn:
# Stelle sicher, dass 'Keine Firma' OU existiert
self._ensure_no_company_ou_exists(conn)
if 'parent_id' in vals:
for record in self:
if record.type != 'contact':
continue
if old_parents[record.id] != record.parent_id.id:
self._handle_company_change(record, old_parents[record.id])
else:
for record in self:
if record.type != 'contact':
continue
record._sync_to_ldap()
return result
def unlink(self):
"""Überschreibe unlink um Kontakte bei Firmenlöschung zu verschieben
"""
if not self._global_sync_on():
return super().unlink()
with get_ldap_connection(self.env) as conn:
for record in self:
if record.type != 'contact':
continue
try:
if record.is_company:
# Wenn eine Firma gelöscht wird, verschiebe erst alle Kontakte
record._move_contacts_to_no_company(conn)
# Lösche den eigentlichen Eintrag
dn = record._get_ldap_dn()
if conn.delete(dn):
_logger.debug(f"Deleted LDAP entry: {dn}")
else:
_logger.warning(f"Failed to delete LDAP entry: {conn.result}")
except LDAPException as e:
_logger.error(f"Error during deletion of {dn}: {str(e)}")
raise UserError(f"LDAP Delete Error: {str(e)}")
return super().unlink()
def _global_sync_on(self):
return get_config(self.env, 'partner_sync_on', False)
def _get_user_posix_data(self):
self.ensure_one()
def get_user_password(user_id):
self.env.cr.execute("SELECT password FROM res_users WHERE id=%s", [user_id])
res = self.env.cr.dictfetchone()
if res and res.get('password'):
return res.get('password')
return ''
if self.user_ids:
_logger.error([user.password + str(user) for user in self.user_ids])
return {
'objectClass': ['posixAccount'], # , 'ldapPublicKey'],
'userPassword': [get_user_password(user.id) for user in self.user_ids],
'uid': [user.login for user in self.user_ids],
'loginShell': [user.posix_login_shell or '/usr/bin/nologin' for user in self.user_ids],
'homeDirectory': [user.posix_home_directory or f'/tmp/{user.posix_uid}' for user in self.user_ids],
'uidNumber': self.id + 1000,
'gidNumber': self.id + 1000,
# 'sshPublicKey': self.ssh_public_key,
}
return {}
def _get_no_company_dn(self, with_base=False):
"""Gibt den DN für den 'Keine Firma' ldap_part zurück
Wenn mit with_base den ganzen DN"""
prefix = get_config(self.env, 'company_prefix', 'company')
ldap_base = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company')
if with_base:
return f"ou={prefix}-0,{ldap_base}"
return f"ou={prefix}-0"
def _ensure_no_company_ou_exists(self, conn):
"""Stellt sicher, dass die 'Keine Firma' existiert
"""
no_company_dn = self._get_no_company_dn(with_base=True)
ldap_base = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company')
try:
# Prüfe ob DN bereits existiert
conn.search(ldap_base, f'({no_company_dn.split(",")[0]})', SUBTREE, attributes=['*'])
if not conn.entries:
# Erstelle 'Keine Firma' falls sie nicht existiert
attrs = {
'objectClass': ['top', 'organizationalUnit'],
'ou': 'Keine Firma',
'description': 'Automatisch erstellt für Kontakte ohne Firmenzugehörigkeit'
}
if conn.add(no_company_dn, attrs['objectClass'], attrs):
_logger.debug(f"Created 'Keine Firma' with dn:{no_company_dn}")
else:
_logger.error(f"Failed to create 'Keine Firma' - dn:{no_company_dn} - attrs:{attrs} - response:{conn.result}")
raise UserError(f"Did not create 'Keine Firma' with dn:{no_company_dn}")
except LDAPException as e:
_logger.error(f"Error ensuring 'Keine Firma': {str(e)}")
raise UserError(f"LDAP Error: {str(e)}")
return no_company_dn
def _get_own_dn_part(self):
self.ensure_one()
if self.is_company:
prefix = get_config(self.env, 'company_prefix', 'company')
attr_ = 'ou'
else:
prefix = get_config(self.env, 'person_prefix', 'customer')
attr_ = 'dc'
contact_prop = get_config(self.env, 'contact_prop', 'id')
prop_ = getattr(self, contact_prop)
return f"{attr_}={prefix}-{prop_}"
def _get_ldap_dn(self):
"""Generiert den LDAP DN für den Kontakt
"""
base_dn = get_config(self.env, 'ldap_base', 'o=customer,dc=my-company')
dn = [self._get_own_dn_part()]
if self.is_company:
pass
elif self.parent_id and self.parent_id.is_company:
dn.append(self.parent_id._get_own_dn_part())
else:
dn.append(self._get_no_company_dn(with_base=False))
dn.append(base_dn)
return ','.join(dn)
def _move_contacts_to_no_company(self, conn):
"""Verschiebt alle Kontakte einer zu löschenden Firma
unter 'Keine Firma'"""
self.ensure_one() # Stelle sicher, dass wir nur eine Firma behandeln
if not self.is_company:
return
# Stelle sicher, dass 'Keine Firma' existiert
no_company_dn = self._ensure_no_company_ou_exists(conn)
# Suche alle Kontakte unter der Firma
company_dn = self._get_ldap_dn()
conn.search(company_dn, '(objectClass=inetOrgPerson)', SUBTREE, attributes=['*'])
for entry in conn.entries:
old_dn = entry.entry_dn
new_dn = f"dc={entry.dc.value},{no_company_dn}"
try:
# Verschiebe den Kontakt unter 'Keine Firma'
if conn.modify_dn(old_dn, f"dc={entry.dc.value}", new_superior=no_company_dn):
_logger.info(f"Moved contact from {old_dn} to {new_dn}")
else:
_logger.error(f"Failed to move contact: {conn.result}")
except LDAPException as e:
_logger.error(f"Error moving contact {old_dn}: {str(e)}")
def _prepare_ldap_data(self, new_company=False):
"""Bereitet die LDAP-Attribute vor
"""
data = {'cn': self.name or 'Unknown N.N.'}
city = self.city
zip = self.zip
country = self.country_id.code if self.country_id else ''
street = [self.street, self.street2]
phone = self.phone.replace('+', '00') if self.phone else ''
if self.parent_id and new_company:
city = self.parent_id.city or self.city
phone = (self.parent_id.phone.replace('+', '00') if self.parent_id.phone else '') or self.phone
country = self.parent_id.country_id.code if self.parent_id.country_id else country
street = [self.parent_id.street or street[0], self.parent_id.street2 or street[1]]
if self.is_company:
data.update({
'objectClass': ['top', 'organizationalUnit', 'extensibleObject'],
})
else:
posix_data = self._get_user_posix_data()
if posix_data:
posix_class = posix_data.pop('objectClass')
else:
posix_class = []
data.update({
'objectClass': ['top', 'inetOrgPerson', 'person', 'extensibleObject'] + posix_class,
'sn': ' '.join((self.name or '').split(' ')[1:]).strip() or 'N.N.',
'givenName': self.name.split()[0] if self.name else 'Unknown',
})
if posix_data:
data.update(posix_data)
if self.parent_id:
city = city or self.parent_id.city
zip = zip or self.parent_id.zip
country = country or (self.parent_id.country_id.code if self.parent_id.country_id else '')
street = [street[0] or self.parent_id.street, self.street2 or self.parent_id.street2]
phone = phone or (self.parent_id.phone.replace('+', '00') if self.parent_id.phone else '')
if self.email:
data['mail'] = self._assign_child_to_ldap_values('email')
if self.mobile:
data['mobile'] = self._assign_child_to_ldap_values('mobile', lambda x: x.replace('+', '00'))
if phone:
data['telephoneNumber'] = self._assign_child_to_ldap_values('phone', lambda x: x.replace('+', '00'), value=phone)
if country:
# data['c'] = self._assign_child_to_ldap_values('country_id.code', value=country)
data['c'] = country
if zip:
# data['postalCode'] = self._assign_child_to_ldap_values('zip', value=zip)
data['postalCode'] = zip
if city:
# data['localityName'] = self._assign_child_to_ldap_values('city', value=city)
data['localityName'] = city
street = filter(None, street)
if street:
data['postalAddress'] = ', '.join(street)
return data
def _assign_child_to_ldap_values(self, dbprop: str, callback=None, value=None):
vals = [value or getattr(self, dbprop)]
# Do not assign all the employees to a company
if self.is_company:
return vals
if self.child_ids:
for child in self.child_ids:
if getattr(child, dbprop):
vals.append(getattr(child, dbprop))
vals = list(filter(None, vals))
if vals and callback:
vals = list(map(callback, vals))
return vals
def _change_values(self, attrs=None):
"""Makes modify data out of attrs
"""
attrs = attrs or self._prepare_ldap_data()
return {
attr: [(MODIFY_REPLACE, [value] if not isinstance(value, list) else value)]
for attr, value in attrs.items()
if attr != 'objectClass'
}
def _exists_in_ldap(self, conn, dn=None, filter=None, _conn=None, record=None):
s = record or self
dn = dn or s._get_ldap_dn()
filter = filter or f"({dn.split(',')[0]})"
conn.search(get_config(s.env, 'ldap_base'), filter, SUBTREE)
return bool(conn.entries)
def _ldap_add(self, conn, dn=None, attrs=None, _conn=None, record=None):
s = record or self
dn = dn or s._get_ldap_dn()
typ = 'Company' if s.is_company else 'Contact'
attrs = attrs or s._prepare_ldap_data()
if conn.add(dn, attrs.pop('objectClass'), attrs):
_logger.debug(f"{typ} created: {dn}")
return attrs
else:
_logger.error(f"Failed to create {typ} dn:{dn} - {conn.result}")
return False
def _ldap_modify(self, conn, dn=None, attrs=None, changes=None, record=None):
s = record or self
dn = dn or s._get_ldap_dn()
typ = 'Company' if s.is_company else 'Contact'
attrs = attrs or s._prepare_ldap_data()
changes = changes or s._change_values(attrs)
if changes and conn.modify(dn, changes):
_logger.debug(f"{typ} DN updated: {dn}")
return changes
else:
_logger.error(f"Failed to update {typ} dn:{dn} - {conn.result}")
return False
def _sync_company(self, conn, dn=None):
"""Synchronisiert eine Firma (OU) mit LDAP
"""
self.ensure_one()
try:
# Prüfe ob OU existiert
if not self._exists_in_ldap(conn, dn=dn):
self._ldap_add(conn, dn=dn)
else:
# Erstelle neue ON
self._ldap_add(conn, dn=dn)
except LDAPException as e:
_logger.error(f"Error syncing company {self.name}: {str(e)}")
raise UserError(f"LDAP Sync Error: {str(e)}")
def _sync_contact(self, conn, dn=None):
"""Synchronisiert einen Kontakt mit LDAP
"""
try:
if self._exists_in_ldap(conn, dn=dn):
self._ldap_modify(conn, dn=dn)
else:
self._ldap_add(conn, dn=dn)
except LDAPException as e:
_logger.error(f"Error syncing contact {self.name}: {str(e)}")
raise UserError(f"LDAP Sync Error: {str(e)}")
def _sync_to_ldap(self):
"""Synchronisiert den Kontakt mit LDAP"""
self.ensure_one()
with get_ldap_connection(self.env) as conn:
if self.is_company:
self._sync_company(conn)
else:
# Für normale Kontakte
if not self.parent_id:
self._ensure_no_company_ou_exists(conn)
elif self.parent_id.is_company:
self.parent_id._sync_company(conn)
self._sync_contact(conn)
def _handle_company_change(self, record, old_parent_id):
"""Behandelt den Wechsel eines Kontakts zu einer anderen Firma"""
with get_ldap_connection(self.env) as conn:
# Stelle sicher dass "Keine Firma" OU existiert
no_company_dn = self._ensure_no_company_ou_exists(conn)
dn_part = record._get_own_dn_part()
# Bestimme den alten DN basierend auf der vorherigen Situation
if old_parent_id:
# Wechsel von einer konkreten Firma
old_company = self.env['res.partner'].browse(old_parent_id)
old_dn = f"{dn_part},{old_company._get_ldap_dn()}"
else:
# Wechsel von "Keine Firma"
old_dn = f"{dn_part},{no_company_dn}"
# Bestimme den neuen DN basierend auf der neuen Situation
if record.parent_id:
# Wechsel zu einer konkreten Firma
new_superior = record.parent_id._get_ldap_dn()
else:
# Wechsel zu "Keine Firma"
new_superior = no_company_dn
try:
# Prüfe ob der alte Eintrag existiert
conn.search(','.join(old_dn.split(' ')[1:]), f'({dn_part})', attributes=['*'])
if not conn.entries:
_logger.warning(f"Old entry not found: {old_dn}")
# Wenn der alte Eintrag nicht existiert, erstelle einfach einen neuen
record._sync_contact(conn)
return
# Führe die Verschiebung durch
if conn.modify_dn(old_dn, f"{dn_part}", new_superior=new_superior):
_logger.info(f"Moved LDAP entry from {old_dn} to {record._get_ldap_dn()}")
record._sync_to_ldap()
else:
_logger.error(f"Failed to move LDAP entry: {conn.result}")
if conn.result['description'].lower() != 'entryalreadyexists':
raise UserError(f"LDAP Move Error:{old_dn} {record._get_own_dn_part()} {new_superior} {conn.result['description']}")
else:
conn.delete(old_dn)
except LDAPException as e:
_logger.error(f"Error during company change for {record.name}: {str(e)}")
raise UserError(f"LDAP Company Change Error: {str(e)}")