316 lines
10 KiB
Python
Executable File
316 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script zum Setzen von Anwesenheit in Odoo 19
|
|
"""
|
|
|
|
from datetime import (
|
|
datetime,
|
|
time,
|
|
timedelta,
|
|
date as d_date
|
|
)
|
|
import requests
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import getpass
|
|
import json
|
|
import math
|
|
import logging
|
|
from random import randint
|
|
from typing import (
|
|
List,
|
|
Optional,
|
|
Union
|
|
)
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
|
|
user = getpass.getuser()
|
|
home = os.path.join("/home", user)
|
|
configfile = os.path.join(home, '.config/odoo-attendance.json')
|
|
mandatory =['username', 'database', 'url', 'password', 'apikey']
|
|
|
|
config = None
|
|
|
|
|
|
class Config(dict):
|
|
def __init__(self):
|
|
self.args = self.cliargs()
|
|
self.configfile = self.args.config
|
|
self.update(self.from_configfile())
|
|
if not self.get('date'):
|
|
self['date'] = str(d_date.today())
|
|
|
|
def __getattr__(self, item):
|
|
try:
|
|
return self[item]
|
|
except KeyError:
|
|
raise AttributeError(f"'Config' object has no attribute '{item}'")
|
|
|
|
def from_configfile(self):
|
|
cfg = {}
|
|
if os.path.isfile(self.configfile):
|
|
with open(self.configfile) as _cfgf:
|
|
cfg = json.load(_cfgf)
|
|
|
|
argsdict = self.args.__dict__
|
|
# override default config
|
|
for argsk, argsv in argsdict.items():
|
|
if argsv:
|
|
cfg[argsk] = argsv
|
|
else:
|
|
cfg.setdefault(argsk, argsv)
|
|
|
|
assert all(map(lambda x: cfg.get(x), mandatory)), f"Nicht alle Felder in gesetzt"
|
|
return cfg
|
|
|
|
|
|
def cliargs(self):
|
|
parser = argparse.ArgumentParser()
|
|
|
|
# Config
|
|
parser.add_argument('-c', '--config', help='Configfile', default=configfile)
|
|
|
|
# Odoo Verbindungsparameter
|
|
parser.add_argument('-l', '--url', dest='url', help='Odoo Server URL (z.B. http://localhost:8069)')
|
|
parser.add_argument('-d', '--database', dest='database', help='Datenbankname')
|
|
parser.add_argument('-u', '--username', dest='username', help='Benutzername')
|
|
parser.add_argument('-p', '--password', dest='password', help='Passwort oder API-Key')
|
|
parser.add_argument('-a', '--apikey', dest='apikey', help='Passwort oder API-Key')
|
|
|
|
|
|
# Anwesenheits-Parameter
|
|
parser.add_argument('-D', '--date', dest='date', help='Datum im Format YYYY-MM-DD')
|
|
parser.add_argument('-B', '--begin', dest='begin', help='Arbeitsbeginn', default='8:00')
|
|
parser.add_argument('-X', '--duration', dest='duration', help='Gesamtdauer', default=8.0, type=float)
|
|
parser.add_argument('-o', '--min-pause', dest='min_pause', help='Minimale Pause in Minuten', default=30, type=int)
|
|
parser.add_argument('-O', '--max-pause', dest='max_pause', help='Maximale Pause in Minuten', default=75, type=int)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def calc_timevalue(s: Union[float, int, str]):
|
|
if isinstance(s, (int, float)) and s.is_integer():
|
|
return s, 0, 0
|
|
if isinstance(s, str):
|
|
if ':' in s:
|
|
l = s.split(':')
|
|
hour = int(l.pop(0))
|
|
try:
|
|
minutes = int(l.pop(0))
|
|
except IndexError:
|
|
minutes = 0
|
|
try:
|
|
seconds = int(l.pop(0))
|
|
except IndexError:
|
|
seconds = 0
|
|
return hour, minutes, seconds
|
|
else:
|
|
s = float(s.replace(',', '.'))
|
|
|
|
s = float(round(s,2))
|
|
hour = math.floor(s)
|
|
minutes = math.floor((s - hour) * 60)
|
|
return hour, minutes, 0
|
|
|
|
|
|
class OdooAttendance:
|
|
|
|
def request(self, url, **kwargs):
|
|
kwargs.setdefault('context', {}).setdefault('lang', 'de_DE')
|
|
response = requests.post(
|
|
f"{config.url.rstrip('/')}/json/2/{url.lstrip('/')}",
|
|
headers={
|
|
"X-Odoo-Database": config['database'],
|
|
"Authorization": f"bearer {config['apikey']}",
|
|
},
|
|
json=kwargs,
|
|
)
|
|
if not response.ok:
|
|
logging.error(response.content)
|
|
raise ValueError(f"Response Code: {response.status_code}")
|
|
return response.json()
|
|
|
|
def __init__(self):
|
|
self.url = config.url.rstrip('/')
|
|
self.db = config.database
|
|
self.username = config.username
|
|
self.password = config.password
|
|
self.date = config.date
|
|
self.apikey = config.apikey
|
|
|
|
logging.debug(f"Verbinde zu: {self.url}")
|
|
logging.debug(f"Datenbank: {self.db}")
|
|
|
|
self.uid = self.request('/res.users/context_get')['uid']
|
|
if not self.uid:
|
|
raise Exception("Authentifizierung fehlgeschlagen")
|
|
logging.info(f"Erfolgreich verbunden als User ID: {self.uid}")
|
|
|
|
employees = self.request('/hr.employee/search_read', domain=[['user_id', '=', self.uid]], fields=['id', 'name'])
|
|
|
|
if not employees:
|
|
raise Exception(f"Kein Mitarbeiter gefunden für User: {self.uid}")
|
|
|
|
if len(employees) > 1:
|
|
logging.error("Mehrere Mitarbeiter gefunden:")
|
|
for emp in employees:
|
|
logging.error(f" - ID {emp['id']}: {emp['name']}")
|
|
raise Exception("Bitte spezifischere Suchkriterien oder Employee ID verwenden")
|
|
|
|
self.employee = employees[0]
|
|
self.employee_id = self.employee['id']
|
|
|
|
def check_existing(self, date=None):
|
|
|
|
date = date or self.date
|
|
logging.debug(f"Check exiting for {date}")
|
|
|
|
# Prüfen ob bereits Anwesenheit existiert
|
|
existing = self.request(
|
|
'/hr.attendance/search_read',
|
|
domain=[['employee_id', '=', self.employee_id],
|
|
['check_in', '>=', f'{date} 00:00:00'],
|
|
['check_in', '<=', f'{date} 23:59:59']],
|
|
fields=['id', 'check_in', 'check_out']
|
|
)
|
|
|
|
if existing:
|
|
logging.error(f"Anwesenheit existiert bereits für {date}:")
|
|
for att in existing:
|
|
logging.error(
|
|
f" ID {att['id']}: Check-in: {att['check_in']}, Check-out: {att.get('check_out', 'Offen')}")
|
|
raise ValueError("Existing entries {existing}")
|
|
|
|
def time_from_seconds(self, total_seconds):
|
|
hours = total_seconds // 3600
|
|
minutes = (total_seconds % 3600) // 60
|
|
seconds = total_seconds % 60
|
|
|
|
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
|
|
def rand_seconds(self, minutes=5, seconds=60, plus=True, minus=True):
|
|
pm = []
|
|
if minus:
|
|
pm.append(-1)
|
|
if minus:
|
|
pm.append(1)
|
|
logging.debug(f"Factor: {pm}")
|
|
assert any(pm), f"Must have pluminus factors - is '{pm}'"
|
|
assert minutes <= 10, f"Minutes to substract/add must be <= 10 - is {minutes}'"
|
|
assert seconds <= 60, f"Seconds to substract/add must be <= 60 - is {secondss}'"
|
|
|
|
factor = pm[randint(0, len(pm) - 1)]
|
|
|
|
minutes = randint(0, minutes) if minutes > 0 else rand(0, 5)
|
|
seconds = randint(0, seconds) if seconds > 0 else rand(0, 60)
|
|
|
|
return factor * (minutes * 60 + seconds)
|
|
|
|
def make_pause(self):
|
|
return 60 * randint(config.min_pause, config.max_pause) + randint(0, 60)
|
|
|
|
def rand_duration(self):
|
|
hours_secs = int(config.duration * 3600)
|
|
rand_sec = self.rand_seconds()
|
|
seconds = hours_secs + rand_sec
|
|
logging.info(f"Duration: {hours_secs} - Rand sec: {rand_sec} = {seconds}")
|
|
|
|
def tfs(t):
|
|
return self.time_from_seconds(t)
|
|
|
|
if config.duration <= 6:
|
|
return [tfs(seconds)]
|
|
if config.duration <= 9:
|
|
first_chunk = 5 * 3600 + 40 * 60 + randint(0, 20 * 60)
|
|
second_chunk = seconds - first_chunk
|
|
pause = self.make_pause()
|
|
return [tfs(first_chunk), tfs(pause), tfs(second_chunk)]
|
|
else:
|
|
first_chunk = 3 * 3600 + 40 * 60 + randint(0, 20 * 60)
|
|
third_chunk = 3600 + 10 * 60 + randint(0, 20 * 60)
|
|
second_chunk = seconds - first_chunk - third_chunk
|
|
pause1 = self.make_pause()
|
|
pause2 = self.make_pause(15)
|
|
return [tfs(first_chunk), tfs(pause1), tfs(second_chunk), tfs(pause2), tfs(third_chunk)]
|
|
|
|
def set_auto_attendance(self):
|
|
date = self.date
|
|
day = self.rand_duration()
|
|
begin, _bm, _bs = calc_timevalue(config.begin)
|
|
assert len(day) % 2, f"{day} ist nicht ungerade"
|
|
assert int(begin) in range(6, 13), "Beginn muss zwischen 5 und 13 Uhr sein"
|
|
plusmin = [-1, 1][randint(0, 1)] * randint(0, 15)
|
|
plussec = randint(0, 60)
|
|
if plusmin < 0:
|
|
begin -= 1
|
|
plusmin = 60 + plusmin
|
|
begin = datetime.combine(datetime.strptime(date, '%Y-%m-%d'), time(begin, plusmin, plussec))
|
|
chunks = []
|
|
pause = False
|
|
for part in day:
|
|
if pause:
|
|
pause = False
|
|
logging.debug(f"add pause {part}")
|
|
begin += part
|
|
continue
|
|
b = begin.strftime('%Y-%m-%d %H:%M:%S')
|
|
begin += part
|
|
e = begin.strftime('%Y-%m-%d %H:%M:%S')
|
|
chunks.append({
|
|
'check_in': b,
|
|
'check_out': e,
|
|
})
|
|
pause = True
|
|
# Neue Anwesenheit erstellen
|
|
a_ids = []
|
|
for chunk in chunks:
|
|
logging.debug(f"Set attendance for {chunk}")
|
|
attendance_id = self.request(
|
|
'/hr.attendance/create',
|
|
vals_list={
|
|
'employee_id': self.employee_id,
|
|
**chunk
|
|
}
|
|
)
|
|
a_ids.append(attendance_id)
|
|
|
|
return a_ids
|
|
|
|
|
|
def main():
|
|
|
|
try:
|
|
odoo = OdooAttendance()
|
|
|
|
logging.info(f"Mitarbeiter gefunden: {odoo.employee['name']} (ID: {odoo.employee['id']})")
|
|
|
|
odoo.check_existing()
|
|
|
|
if config.duration:
|
|
attendance_ids = odoo.set_auto_attendance()
|
|
else:
|
|
raise NotImplemented('...')
|
|
|
|
if attendance_ids:
|
|
logging.info(f"Anwesenheit erfolgreich erstellt (ID: {attendance_ids})")
|
|
logging.info(f" Mitarbeiter: {odoo.employee['name']}")
|
|
logging.info(f" Datum: {odoo.date}")
|
|
if not config.duration:
|
|
raise NotImplemented('Separates check_in und check_out kommt noch')
|
|
else:
|
|
logging.info(f" Dauer: {config.duration}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"{e}")
|
|
raise
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
config = Config()
|
|
exit(main())
|