#!/usr/bin/env python3 """ Script zum Setzen von Anwesenheit in Odoo 19 """ from datetime import ( datetime, time, timedelta, date as d_date ) import ( requests, argparse, sys, os, getpass, json, math, logging ) from random import randint from typing import ( List, Optional, Union ) logging.basicConfig(level=logging.DEBUG) user = getpass.getuser() home = os.path.join("/home", user) configfile = os.path.join(home, '.config/odoo-attendance.json') mandatory =['username', 'database', 'url', 'password', 'apikey'] config = None class Config(dict): def __init__(self): self.args = self.cliargs() self.configfile = self.args.config self.update(self.from_configfile()) if not self.get('date'): self['date'] = str(d_date.today()) def __getattr__(self, item): try: return self[item] except KeyError: raise AttributeError(f"'Config' object has no attribute '{item}'") def from_configfile(self): cfg = {} if os.path.isfile(self.configfile): with open(self.configfile) as _cfgf: cfg = json.load(_cfgf) argsdict = self.args.__dict__ # override default config for argsk, argsv in argsdict.items(): if argsv: cfg[argsk] = argsv else: cfg.setdefault(argsk, argsv) assert all(map(lambda x: cfg.get(x), mandatory)), f"Nicht alle Felder in gesetzt" return cfg def cliargs(self): parser = argparse.ArgumentParser() # Config parser.add_argument('-c', '--config', help='Configfile', default=configfile) # Odoo Verbindungsparameter parser.add_argument('-l', '--url', dest='url' help='Odoo Server URL (z.B. http://localhost:8069)') parser.add_argument('-d', '--database', dest='database', help='Datenbankname') parser.add_argument('-u', '--username', dest='username', help='Benutzername') parser.add_argument('-p', '--password', dest='password', help='Passwort oder API-Key') parser.add_argument('-a', '--apikey', dest='apikey', help='Passwort oder API-Key') # Anwesenheits-Parameter parser.add_argument('-D', '--date', dest='date', help='Datum im Format YYYY-MM-DD') parser.add_argument('-B', '--begin', dest='begin', help='Arbeitsbeginn', default='8:00') parser.add_argument('-X', '--duration', dest='duration', help='Gesamtdauer', default=8.0, type=float) parser.add_argument('-o', '--min-pause', dest='min_pause', help='Minimale Pause in Minuten', default=30, type=int) parser.add_argument('-O', '--max-pause', dest='max_pause', help='Maximale Pause in Minuten', default=75, type=int) return parser.parse_args() def calc_timevalue(s: Union[float, int, str]): if isinstance(s, (int, float)) and s.is_integer(): return s, 0, 0 if isinstance(s, str): if ':' in s: l = s.split(':') hour = int(l.pop(0)) try: minutes = int(l.pop(0)) except IndexError: minutes = 0 try: seconds = int(l.pop(0)) except IndexError: seconds = 0 return hour, minutes, seconds else: s = float(s.replace(',', '.')) s = float(round(s,2)) hour = math.floor(s) minutes = math.floor((s - hour) * 60) return hour, minutes, 0 class OdooAttendance: def request(self, url, **kwargs): kwargs.setdefault('context', {}).setdefault('lang', 'de_DE') response = requests.post( f"{config.url.rstrip('/')}/json/2/{url.lstrip('/')}", headers={ "X-Odoo-Database": config['database'], "Authorization": f"bearer {config['apikey']}", }, json=kwargs, ) if not response.ok: logging.error(response.content) raise ValueError(f"Response Code: {response.status_code}") return response.json() def __init__(self): self.url = config.url.rstrip('/') self.db = config.database self.username = config.username self.password = config.password self.date = config.date self.apikey = config.apikey logging.debug(f"Verbinde zu: {self.url}") logging.debug(f"Datenbank: {self.db}") self.uid = self.request('/res.users/context_get')['uid'] if not self.uid: raise Exception("Authentifizierung fehlgeschlagen") logging.info(f"Erfolgreich verbunden als User ID: {self.uid}") employees = self.request('/hr.employee/search_read', domain=[['user_id', '=', self.uid]], fields=['id', 'name']) if not employees: raise Exception(f"Kein Mitarbeiter gefunden für User: {self.uid}") if len(employees) > 1: logging.error("Mehrere Mitarbeiter gefunden:") for emp in employees: logging.error(f" - ID {emp['id']}: {emp['name']}") raise Exception("Bitte spezifischere Suchkriterien oder Employee ID verwenden") self.employee = employees[0] self.employee_id = self.employee['id'] def check_existing(self, date=None): date = date or self.date logging.debug(f"Check exiting for {date}") # Prüfen ob bereits Anwesenheit existiert existing = self.request( '/hr.attendance/search_read', domain=[['employee_id', '=', self.employee_id], ['check_in', '>=', f'{date} 00:00:00'], ['check_in', '<=', f'{date} 23:59:59']], fields=['id', 'check_in', 'check_out'] ) if existing: logging.error(f"Anwesenheit existiert bereits für {date}:") for att in existing: logging.error( f" ID {att['id']}: Check-in: {att['check_in']}, Check-out: {att.get('check_out', 'Offen')}") raise ValueError("Existing entries {existing}") def time_from_seconds(self, total_seconds): hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 return timedelta(hours=hours, minutes=minutes, seconds=seconds) def rand_seconds(self, minutes=5, seconds=60, plus=True, minus=True): pm = [] if minus: pm.append(-1) if minus: pm.append(1) logging.debug(f"Factor: {pm}") assert any(pm), f"Must have pluminus factors - is '{pm}'" assert minutes <= 10, f"Minutes to substract/add must be <= 10 - is {minutes}'" assert seconds <= 60, f"Seconds to substract/add must be <= 60 - is {secondss}'" factor = pm[randint(0, len(pm) - 1)] minutes = randint(0, minutes) if minutes > 0 else rand(0, 5) seconds = randint(0, seconds) if seconds > 0 else rand(0, 60) return factor * (minutes * 60 + seconds) def make_pause(self): return 60 * randint(config.min_pause, config.max_pause) + randint(0, 60) def rand_duration(self): hours_secs = int(config.duration * 3600) rand_sec = self.rand_seconds() seconds = hours_secs + rand_sec logging.info(f"Duration: {hours_secs} - Rand sec: {rand_sec} = {seconds}") def tfs(t): return self.time_from_seconds(t) if config.duration <= 6: return [tfs(seconds)] if config.duration <= 9: first_chunk = 5 * 3600 + 40 * 60 + randint(0, 20 * 60) second_chunk = seconds - first_chunk pause = self.make_pause() return [tfs(first_chunk), tfs(pause), tfs(second_chunk)] else: first_chunk = 3 * 3600 + 40 * 60 + randint(0, 20 * 60) third_chunk = 3600 + 10 * 60 + randint(0, 20 * 60) second_chunk = seconds - first_chunk - third_chunk pause1 = self.make_pause() pause2 = self.make_pause(15) return [tfs(first_chunk), tfs(pause1), tfs(second_chunk), tfs(pause2), tfs(third_chunk)] def set_auto_attendance(self): date = self.date day = self.rand_duration(config.duration) begin, _bm, _bs = calc_timevalue(config.begin) assert len(day) % 2, f"{day} ist nicht ungerade" assert int(begin) in range(6, 13), "Beginn muss zwischen 5 und 13 Uhr sein" plusmin = [-1, 1][randint(0, 1)] * randint(0, 15) plussec = randint(0, 60) if plusmin < 0: begin -= 1 plusmin = 60 + plusmin begin = datetime.combine(datetime.strptime(date, '%Y-%m-%d'), time(begin, plusmin, plussec)) chunks = [] pause = False for part in day: if pause: pause = False logging.debug(f"add pause {part}") begin += part continue b = begin.strftime('%Y-%m-%d %H:%M:%S') begin += part e = begin.strftime('%Y-%m-%d %H:%M:%S') chunks.append({ 'check_in': b, 'check_out': e, }) pause = True # Neue Anwesenheit erstellen a_ids = [] for chunk in chunks: logging.debug(f"Set attendance for {chunk}") attendance_id = self.request( '/hr.attendance/create', vals_list={ 'employee_id': self.employee_id, **chunk } ) a_ids.append(attendance_id) return a_ids def main(): try: config = Config() odoo = OdooAttendance() logging.info(f"Mitarbeiter gefunden: {odoo.employee['name']} (ID: {odoo.employee['id']})") odoo.check_existing() if config.duration: attendance_ids = odoo.set_auto_attendance() else: raise NotImplemented('...') if attendance_ids: logging.info(f"Anwesenheit erfolgreich erstellt (ID: {attendance_ids})") logging.info(f" Mitarbeiter: {odoo.employee['name']}") logging.info(f" Datum: {odoo.date}") if not config.duration: raise NotImplemented('Separates check_in und check_out kommt noch') else: logging.info(f" Dauer: {config.duration}") except Exception as e: logging.error(f"{e}") raise return 0 if __name__ == '__main__': exit(main())