From 6ccf11c59b7e22c8b0766183d15c1db823be4ecf Mon Sep 17 00:00:00 2001 From: Holger Sielaff Date: Tue, 17 Mar 2026 08:31:53 +0100 Subject: [PATCH] Added set odoo attendance --- set_attendance.py | 317 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100755 set_attendance.py diff --git a/set_attendance.py b/set_attendance.py new file mode 100755 index 0000000..5184392 --- /dev/null +++ b/set_attendance.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +Script zum Setzen von Anwesenheit in Odoo 19 +""" + +from datetime import ( + datetime, + time, + timedelta, + date as d_date +) +import ( + requests, + argparse, + sys, + os, + getpass, + json, + math, + logging +) +from random import randint +from typing import ( + List, + Optional, + Union +) + +logging.basicConfig(level=logging.DEBUG) + + +user = getpass.getuser() +home = os.path.join("/home", user) +configfile = os.path.join(home, '.config/odoo-attendance.json') +mandatory =['username', 'database', 'url', 'password', 'apikey'] + +config = None + + +class Config(dict): + def __init__(self): + self.args = self.cliargs() + self.configfile = self.args.config + self.update(self.from_configfile()) + if not self.get('date'): + self['date'] = str(d_date.today()) + + def __getattr__(self, item): + try: + return self[item] + except KeyError: + raise AttributeError(f"'Config' object has no attribute '{item}'") + + def from_configfile(self): + cfg = {} + if os.path.isfile(self.configfile): + with open(self.configfile) as _cfgf: + cfg = json.load(_cfgf) + + argsdict = self.args.__dict__ + # override default config + for argsk, argsv in argsdict.items(): + if argsv: + cfg[argsk] = argsv + else: + cfg.setdefault(argsk, argsv) + + assert all(map(lambda x: cfg.get(x), mandatory)), f"Nicht alle Felder in gesetzt" + return cfg + + + def cliargs(self): + parser = argparse.ArgumentParser() + + # Config + parser.add_argument('-c', '--config', help='Configfile', default=configfile) + + # Odoo Verbindungsparameter + parser.add_argument('-l', '--url', dest='url' help='Odoo Server URL (z.B. http://localhost:8069)') + parser.add_argument('-d', '--database', dest='database', help='Datenbankname') + parser.add_argument('-u', '--username', dest='username', help='Benutzername') + parser.add_argument('-p', '--password', dest='password', help='Passwort oder API-Key') + parser.add_argument('-a', '--apikey', dest='apikey', help='Passwort oder API-Key') + + + # Anwesenheits-Parameter + parser.add_argument('-D', '--date', dest='date', help='Datum im Format YYYY-MM-DD') + parser.add_argument('-B', '--begin', dest='begin', help='Arbeitsbeginn', default='8:00') + parser.add_argument('-X', '--duration', dest='duration', help='Gesamtdauer', default=8.0, type=float) + parser.add_argument('-o', '--min-pause', dest='min_pause', help='Minimale Pause in Minuten', default=30, type=int) + parser.add_argument('-O', '--max-pause', dest='max_pause', help='Maximale Pause in Minuten', default=75, type=int) + + return parser.parse_args() + + +def calc_timevalue(s: Union[float, int, str]): + if isinstance(s, (int, float)) and s.is_integer(): + return s, 0, 0 + if isinstance(s, str): + if ':' in s: + l = s.split(':') + hour = int(l.pop(0)) + try: + minutes = int(l.pop(0)) + except IndexError: + minutes = 0 + try: + seconds = int(l.pop(0)) + except IndexError: + seconds = 0 + return hour, minutes, seconds + else: + s = float(s.replace(',', '.')) + + s = float(round(s,2)) + hour = math.floor(s) + minutes = math.floor((s - hour) * 60) + return hour, minutes, 0 + + +class OdooAttendance: + + def request(self, url, **kwargs): + kwargs.setdefault('context', {}).setdefault('lang', 'de_DE') + response = requests.post( + f"{config.url.rstrip('/')}/json/2/{url.lstrip('/')}", + headers={ + "X-Odoo-Database": config['database'], + "Authorization": f"bearer {config['apikey']}", + }, + json=kwargs, + ) + if not response.ok: + logging.error(response.content) + raise ValueError(f"Response Code: {response.status_code}") + return response.json() + + def __init__(self): + self.url = config.url.rstrip('/') + self.db = config.database + self.username = config.username + self.password = config.password + self.date = config.date + self.apikey = config.apikey + + logging.debug(f"Verbinde zu: {self.url}") + logging.debug(f"Datenbank: {self.db}") + + self.uid = self.request('/res.users/context_get')['uid'] + if not self.uid: + raise Exception("Authentifizierung fehlgeschlagen") + logging.info(f"Erfolgreich verbunden als User ID: {self.uid}") + + employees = self.request('/hr.employee/search_read', domain=[['user_id', '=', self.uid]], fields=['id', 'name']) + + if not employees: + raise Exception(f"Kein Mitarbeiter gefunden für User: {self.uid}") + + if len(employees) > 1: + logging.error("Mehrere Mitarbeiter gefunden:") + for emp in employees: + logging.error(f" - ID {emp['id']}: {emp['name']}") + raise Exception("Bitte spezifischere Suchkriterien oder Employee ID verwenden") + + self.employee = employees[0] + self.employee_id = self.employee['id'] + + def check_existing(self, date=None): + + date = date or self.date + logging.debug(f"Check exiting for {date}") + + # Prüfen ob bereits Anwesenheit existiert + existing = self.request( + '/hr.attendance/search_read', + domain=[['employee_id', '=', self.employee_id], + ['check_in', '>=', f'{date} 00:00:00'], + ['check_in', '<=', f'{date} 23:59:59']], + fields=['id', 'check_in', 'check_out'] + ) + + if existing: + logging.error(f"Anwesenheit existiert bereits für {date}:") + for att in existing: + logging.error( + f" ID {att['id']}: Check-in: {att['check_in']}, Check-out: {att.get('check_out', 'Offen')}") + raise ValueError("Existing entries {existing}") + + def time_from_seconds(self, total_seconds): + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + + return timedelta(hours=hours, minutes=minutes, seconds=seconds) + + def rand_seconds(self, minutes=5, seconds=60, plus=True, minus=True): + pm = [] + if minus: + pm.append(-1) + if minus: + pm.append(1) + logging.debug(f"Factor: {pm}") + assert any(pm), f"Must have pluminus factors - is '{pm}'" + assert minutes <= 10, f"Minutes to substract/add must be <= 10 - is {minutes}'" + assert seconds <= 60, f"Seconds to substract/add must be <= 60 - is {secondss}'" + + factor = pm[randint(0, len(pm) - 1)] + + minutes = randint(0, minutes) if minutes > 0 else rand(0, 5) + seconds = randint(0, seconds) if seconds > 0 else rand(0, 60) + + return factor * (minutes * 60 + seconds) + + def make_pause(self): + return 60 * randint(config.min_pause, config.max_pause) + randint(0, 60) + + def rand_duration(self): + hours_secs = int(config.duration * 3600) + rand_sec = self.rand_seconds() + seconds = hours_secs + rand_sec + logging.info(f"Duration: {hours_secs} - Rand sec: {rand_sec} = {seconds}") + + def tfs(t): + return self.time_from_seconds(t) + + if config.duration <= 6: + return [tfs(seconds)] + if config.duration <= 9: + first_chunk = 5 * 3600 + 40 * 60 + randint(0, 20 * 60) + second_chunk = seconds - first_chunk + pause = self.make_pause() + return [tfs(first_chunk), tfs(pause), tfs(second_chunk)] + else: + first_chunk = 3 * 3600 + 40 * 60 + randint(0, 20 * 60) + third_chunk = 3600 + 10 * 60 + randint(0, 20 * 60) + second_chunk = seconds - first_chunk - third_chunk + pause1 = self.make_pause() + pause2 = self.make_pause(15) + return [tfs(first_chunk), tfs(pause1), tfs(second_chunk), tfs(pause2), tfs(third_chunk)] + + def set_auto_attendance(self): + date = self.date + day = self.rand_duration(config.duration) + begin, _bm, _bs = calc_timevalue(config.begin) + assert len(day) % 2, f"{day} ist nicht ungerade" + assert int(begin) in range(6, 13), "Beginn muss zwischen 5 und 13 Uhr sein" + plusmin = [-1, 1][randint(0, 1)] * randint(0, 15) + plussec = randint(0, 60) + if plusmin < 0: + begin -= 1 + plusmin = 60 + plusmin + begin = datetime.combine(datetime.strptime(date, '%Y-%m-%d'), time(begin, plusmin, plussec)) + chunks = [] + pause = False + for part in day: + if pause: + pause = False + logging.debug(f"add pause {part}") + begin += part + continue + b = begin.strftime('%Y-%m-%d %H:%M:%S') + begin += part + e = begin.strftime('%Y-%m-%d %H:%M:%S') + chunks.append({ + 'check_in': b, + 'check_out': e, + }) + pause = True + # Neue Anwesenheit erstellen + a_ids = [] + for chunk in chunks: + logging.debug(f"Set attendance for {chunk}") + attendance_id = self.request( + '/hr.attendance/create', + vals_list={ + 'employee_id': self.employee_id, + **chunk + } + ) + a_ids.append(attendance_id) + + return a_ids + + +def main(): + + try: + config = Config() + odoo = OdooAttendance() + + logging.info(f"Mitarbeiter gefunden: {odoo.employee['name']} (ID: {odoo.employee['id']})") + + odoo.check_existing() + + if config.duration: + attendance_ids = odoo.set_auto_attendance() + else: + raise NotImplemented('...') + + if attendance_ids: + logging.info(f"Anwesenheit erfolgreich erstellt (ID: {attendance_ids})") + logging.info(f" Mitarbeiter: {odoo.employee['name']}") + logging.info(f" Datum: {odoo.date}") + if not config.duration: + raise NotImplemented('Separates check_in und check_out kommt noch') + else: + logging.info(f" Dauer: {config.duration}") + + except Exception as e: + logging.error(f"{e}") + raise + + return 0 + + +if __name__ == '__main__': + exit(main())