diff --git a/data/templates/GF_FO_Avis_de_retenue.pdf b/data/templates/GF_FO_Avis_de_retenue.pdf new file mode 100644 index 0000000..ec3a122 Binary files /dev/null and b/data/templates/GF_FO_Avis_de_retenue.pdf differ diff --git a/eptm_dashboard/eptm_dashboard.py b/eptm_dashboard/eptm_dashboard.py index 3499a0b..bfc5109 100644 --- a/eptm_dashboard/eptm_dashboard.py +++ b/eptm_dashboard/eptm_dashboard.py @@ -13,6 +13,7 @@ from .pages.purge import purge_page, PurgeState from .pages.doc import doc_page, DocState from .pages.profile import profile_page, ProfileState from .pages.password_set import password_set_page, PasswordSetState +from .pages.retenue import retenue_page, RetenueState TITLE = "EPTM Dashboard" @@ -58,5 +59,6 @@ app.add_page(params_page, route="/params", on_load=[AuthState.check_auth, app.add_page(purge_page, route="/purge", on_load=[AuthState.check_auth, PurgeState.load_data], title=TITLE) app.add_page(doc_page, route="/doc", on_load=[AuthState.check_auth, DocState.load_data], title=TITLE) app.add_page(profile_page, route="/profile", on_load=[AuthState.check_auth, ProfileState.load_data], title=TITLE) +app.add_page(retenue_page, route="/retenue", on_load=[AuthState.check_auth, RetenueState.load_data], title=TITLE) # Page publique (pas de check_auth — accessible via lien email) app.add_page(password_set_page, route="/password-set", on_load=PasswordSetState.load_data, title=TITLE) diff --git a/eptm_dashboard/pages/cron.py b/eptm_dashboard/pages/cron.py index 3332b25..3e0e3a5 100644 --- a/eptm_dashboard/pages/cron.py +++ b/eptm_dashboard/pages/cron.py @@ -83,7 +83,8 @@ class CronState(AuthState): "schedule_desc": desc, "task_kind": job.task_kind, "task_label": {"push": "Push", "sync": "Sync", - "push_then_sync": "Push + Sync"}.get(job.task_kind, job.task_kind), + "push_then_sync": "Push + Sync", + "push_notices": "Push notices"}.get(job.task_kind, job.task_kind), "last_run_at": job.last_run_at.strftime("%d.%m.%Y %H:%M") if job.last_run_at else "", "last_status": job.last_status, "last_message": job.last_message[:120] if job.last_message else "", @@ -648,13 +649,13 @@ def _form_task_picker() -> rx.Component: return rx.vstack( rx.text("Tâche", size="2", font_weight="600"), rx.radio( - ["push", "sync", "push_then_sync"], + ["push", "sync", "push_then_sync", "push_notices"], value=CronState.f_task_kind, on_change=CronState.set_f_task_kind, direction="column", ), rx.cond( - CronState.f_task_kind != "push", + (CronState.f_task_kind != "push") & (CronState.f_task_kind != "push_notices"), rx.vstack( rx.text("Données à synchroniser", size="2", font_weight="600", margin_top="0.5rem"), diff --git a/eptm_dashboard/pages/escada.py b/eptm_dashboard/pages/escada.py index 98e184b..f496c5b 100644 --- a/eptm_dashboard/pages/escada.py +++ b/eptm_dashboard/pages/escada.py @@ -21,7 +21,7 @@ def _background(fn): from ..state import AuthState from ..sidebar import layout -from src.db import get_session, Apprenti, EscadaPending +from src.db import get_session, Apprenti, EscadaPending, Notice from src.logger import app_log _RE_SYNC_PROD = re.compile(r"^\[\d{2}:\d{2}:\d{2}\] ([^ ].*)$") @@ -45,6 +45,7 @@ DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data"))) CLASSES_CACHE = DATA_DIR / "esacada_classes.json" _SYNC_SCRIPT = _ROOT / "scripts" / "sync_esacada.py" _PUSH_SCRIPT = _ROOT / "scripts" / "push_to_escada.py" +_PUSH_NOTICES_SCRIPT = _ROOT / "scripts" / "push_notices.py" _SYNC_RESULT_FILE = DATA_DIR / "sync_last_result.json" _SYNC_ALL_DONE_FILE = DATA_DIR / "sync_all_done.json" @@ -77,11 +78,19 @@ class EscadaState(AuthState): pending_count: int = 0 pending_data: list[dict] = [] + notices_count: int = 0 + notices_data: list[dict] = [] push_done: bool = False push_ok: int = 0 push_errors: list[str] = [] + # Push notices + is_pushing_notices: bool = False + notices_push_ok: int = 0 + notices_push_done: bool = False + notices_push_errors: list[str] = [] + @rx.var def selected_count(self) -> int: return sum(1 for v in self.class_checked.values() if v) @@ -221,9 +230,32 @@ class EscadaState(AuthState): } for ep in pending ] + self._reload_notices(sess) finally: sess.close() + def _reload_notices(self, sess): + notices = sess.execute( + select(Notice) + .options(joinedload(Notice.apprenti)) + .join(Apprenti, Notice.apprenti_id == Apprenti.id) + .where(Notice.status == "pending") + .order_by(Apprenti.classe, Notice.date_event, Apprenti.nom) + ).scalars().all() + self.notices_count = len(notices) + self.notices_data = [ + { + "id": n.id, + "classe": n.apprenti.classe, + "nom": n.apprenti.nom, + "prenom": n.apprenti.prenom, + "date": n.date_event.strftime("%d.%m.%Y"), + "titre": (n.titre or "")[:80] + ("…" if len(n.titre or "") > 80 else ""), + "source": n.source, + } + for n in notices + ] + # ── Background: refresh classes ──────────────────────────────────────────── @_background @@ -767,6 +799,121 @@ class EscadaState(AuthState): except Exception: pass + # ── Background: push notices vers Escada ────────────────────────────────── + + @_background + async def push_notices(self): + async with self: + user = self.username or "?" + self.is_pushing_notices = True + self.notices_push_done = False + self.notices_push_ok = 0 + self.notices_push_errors = [] + + app_log(f"Push notices Escada démarré par {user}") + cmd = [sys.executable, str(_PUSH_NOTICES_SCRIPT)] + lines: list[str] = [] + _rc_holder = [0] + + def _run() -> None: + _fd, _tmp = tempfile.mkstemp(suffix="_push_notices.log") + os.close(_fd) + try: + with open(_tmp, "wb") as _fout: + _proc = subprocess.Popen( + cmd, stdout=_fout, stderr=subprocess.STDOUT, + env={**os.environ, "PYTHONUNBUFFERED": "1"}, + start_new_session=True, + ) + _offset, _buf = 0, b"" + while True: + _time.sleep(0.5) + try: + with open(_tmp, "rb") as _fin: + _fin.seek(_offset); _chunk = _fin.read(65536) + except Exception: + _chunk = b"" + if _chunk: + _buf += _chunk; _offset += len(_chunk) + while b"\n" in _buf: + _raw, _buf = _buf.split(b"\n", 1) + _ln = _raw.decode("utf-8", errors="replace").rstrip() + if _ln: + lines.append(_ln); _log_sync_line(_ln, prefix="push_notices") + if _proc.poll() is not None: + _rc_holder[0] = _proc.wait() or 0 + break + except Exception as _exc: + app_log(f"Erreur push notices subprocess : {_exc}") + finally: + try: os.unlink(_tmp) + except Exception: pass + + _pool = _cf.ThreadPoolExecutor(max_workers=1) + _fut = _pool.submit(_run) + try: + while not _fut.done(): + try: + await asyncio.sleep(1.0) + except asyncio.CancelledError: + _t = asyncio.current_task() + if _t is not None: + for _ in range(_t.cancelling()): + _t.uncancel() + try: + _fut.result() + except Exception as _te: + app_log(f"[push_notices] thread exception : {_te}") + finally: + _pool.shutdown(wait=False) + + _rc = _rc_holder[0] + nb_ok = 0 + errors: list[str] = [] + done = False + for line in lines: + if "PUSH_NOTICES_DONE " in line: + done = True + try: + p = json.loads(line[line.index("PUSH_NOTICES_DONE ") + len("PUSH_NOTICES_DONE "):]) + nb_ok = p.get("ok", 0) + errors = p.get("err", []) + except Exception as _e: + app_log(f" Erreur parse PUSH_NOTICES_DONE : {_e}", debug=True) + + if done: + app_log(f"Push notices terminé — ok:{nb_ok} erreurs:{len(errors)}") + else: + app_log(f"Push notices : PUSH_NOTICES_DONE non trouvé (code={_rc})") + + try: + _t = asyncio.current_task() + if _t is not None: + for _ in range(_t.cancelling()): + _t.uncancel() + async with self: + self.notices_push_done = done + self.notices_push_ok = nb_ok + self.notices_push_errors = errors + self.is_pushing_notices = False + self._reload_pending() + if done: + if errors: + yield rx.toast.warning( + f"Push notices : {nb_ok} OK, {len(errors)} erreur(s)" + ) + else: + yield rx.toast.success(f"Push notices terminé — {nb_ok} envoyée(s)") + else: + yield rx.toast.error("Push notices échoué — vérifiez les logs") + except Exception as _e: + app_log(f"Erreur mise à jour état push notices : {_e}") + try: + async with self: + self.is_pushing_notices = False + except Exception: + pass + # ── UI helpers ──────────────────────────────────────────────────────────────── @@ -932,6 +1079,18 @@ def _pending_row(item) -> rx.Component: ) +def _notice_row(item) -> rx.Component: + return rx.table.row( + rx.table.cell(item["classe"]), + rx.table.cell(rx.text(item["nom"], " ", item["prenom"])), + rx.table.cell(item["date"]), + rx.table.cell(rx.text(item["titre"], size="1")), + rx.table.cell( + rx.badge(item["source"], color_scheme="blue", variant="soft", size="1"), + ), + ) + + def _sync_progress() -> rx.Component: """Indicateurs de progression — remplace l'ancien op_log dans la section sync.""" return rx.vstack( @@ -1361,6 +1520,103 @@ def escada_page() -> rx.Component: width="100%", ), + # ── Section notices ─────────────────────────────────────────────── + rx.box( + rx.text( + "Notices en attente", + size="3", font_weight="700", color="#37474f", + margin_bottom="0.75rem", + ), + rx.cond( + EscadaState.notices_count == 0, + rx.text("Aucune notice en attente.", size="2", color="#666"), + rx.vstack( + rx.text( + EscadaState.notices_count, + " notice(s) en attente d'envoi vers Escada.", + size="2", color="#e65100", font_weight="600", + ), + rx.box( + rx.table.root( + rx.table.header( + rx.table.row( + rx.table.column_header_cell("Classe"), + rx.table.column_header_cell("Apprenti"), + rx.table.column_header_cell("Date"), + rx.table.column_header_cell("Titre"), + rx.table.column_header_cell("Source"), + ) + ), + rx.table.body( + rx.foreach(EscadaState.notices_data, _notice_row), + ), + width="100%", + size="1", + ), + overflow_x="auto", + width="100%", + ), + spacing="2", + width="100%", + margin_bottom="0.75rem", + ), + ), + rx.flex( + rx.button( + rx.cond( + EscadaState.is_pushing_notices, + rx.spinner(size="2"), + rx.icon("send", size=14), + ), + rx.cond( + EscadaState.is_pushing_notices, + rx.text("Envoi en cours..."), + rx.text("Pousser les notices"), + ), + on_click=EscadaState.push_notices, + disabled=( + EscadaState.is_pushing_notices + | (EscadaState.notices_count == 0) + ), + color_scheme="blue", + size="2", + ), + gap="1rem", align="center", flex_wrap="wrap", + margin_top="0.75rem", + ), + rx.cond( + EscadaState.notices_push_done, + rx.vstack( + rx.cond( + EscadaState.notices_push_ok > 0, + rx.text( + EscadaState.notices_push_ok, + " notice(s) envoyée(s).", + size="2", color="#2e7d32", font_weight="600", + ), + ), + rx.cond( + EscadaState.notices_push_errors.length() > 0, + rx.vstack( + rx.foreach( + EscadaState.notices_push_errors, + lambda e: rx.text("• ", e, size="2", color="#c62828"), + ), + spacing="1", + ), + ), + spacing="2", + margin_top="0.75rem", + width="100%", + ), + ), + padding="1.25rem", + background_color="white", + border_radius="8px", + border="1px solid #e0e0e0", + width="100%", + ), + spacing="4", width="100%", ) diff --git a/eptm_dashboard/pages/params.py b/eptm_dashboard/pages/params.py index 2ea7c78..5d6c9f8 100644 --- a/eptm_dashboard/pages/params.py +++ b/eptm_dashboard/pages/params.py @@ -1,9 +1,17 @@ import json import os +import sys from pathlib import Path import reflex as rx +_ROOT = Path(__file__).resolve().parent.parent.parent +if str(_ROOT) not in sys.path: + sys.path.insert(0, str(_ROOT)) + +from src.profession import load_mapping, save_mapping, find_unmapped_classes, refresh_all_professions # noqa: E402 +from src.db import get_session # noqa: E402 + from ..sidebar import layout from ..state import AuthState @@ -67,6 +75,14 @@ class ParamsState(AuthState): app_base_url: str = "" save_ok_app: bool = False + # ── Profession mapping ──────────────────────────────────────────────────── + prof_mapping: list[dict] = [] + prof_unmapped: list[str] = [] + new_prefix: str = "" + new_profession: str = "" + save_ok_prof: bool = False + refresh_msg: str = "" + # ── Setters ─────────────────────────────────────────────────────────────── def set_texte_sanction(self, v: str): self.texte_sanction = v def set_chef_section(self, v: str): self.chef_section = v @@ -81,6 +97,8 @@ class ParamsState(AuthState): def set_email_subject(self, v: str): self.email_subject = v def set_email_body(self, v: str): self.email_body = v def set_app_base_url(self, v: str): self.app_base_url = v + def set_new_prefix(self, v: str): self.new_prefix = v + def set_new_profession(self, v: str): self.new_profession = v def load_data(self): if not self.authenticated: @@ -104,6 +122,17 @@ class ParamsState(AuthState): self.save_ok_escada = False self.save_ok_template = False self.save_ok_app = False + self._reload_prof_mapping() + + def _reload_prof_mapping(self): + self.prof_mapping = load_mapping() + sess = get_session() + try: + self.prof_unmapped = find_unmapped_classes(sess) + finally: + sess.close() + self.save_ok_prof = False + self.refresh_msg = "" def save_sanctions(self): s = _read_settings() @@ -164,6 +193,46 @@ class ParamsState(AuthState): self.save_ok_escada = False self.save_ok_template = False + # ── Profession mapping ─────────────────────────────────────────────────── + def add_mapping(self): + prefix = self.new_prefix.strip() + prof = self.new_profession.strip() + if not prefix or not prof: + return + cur = list(self.prof_mapping) + # Si le préfixe existe déjà, on met juste à jour la profession + for m in cur: + if m.get("prefix") == prefix: + m["profession"] = prof + break + else: + cur.append({"prefix": prefix, "profession": prof}) + save_mapping(cur) + self.new_prefix = "" + self.new_profession = "" + self._reload_prof_mapping() + self.save_ok_prof = True + + def remove_mapping(self, prefix: str): + cur = [m for m in self.prof_mapping if m.get("prefix") != prefix] + save_mapping(cur) + self._reload_prof_mapping() + self.save_ok_prof = True + + def quick_add_prefix(self, prefix: str): + """Pré-remplit le formulaire avec une classe orpheline.""" + self.new_prefix = prefix + self.new_profession = "" + + def apply_mapping_to_db(self): + """Recalcule la profession pour tous les apprentis avec le mapping actuel.""" + sess = get_session() + try: + n = refresh_all_professions(sess) + finally: + sess.close() + self.refresh_msg = f"{n} fiche(s) mise(s) à jour." + # ── UI helpers ──────────────────────────────────────────────────────────────── @@ -445,6 +514,127 @@ def _section_template() -> rx.Component: ) +def _mapping_row(m: rx.Var) -> rx.Component: + return rx.flex( + rx.box( + rx.text("Préfixe", size="1", color="var(--gray-10)"), + rx.text(m["prefix"], size="2", weight="medium"), + flex="1", min_width="120px", + ), + rx.box( + rx.text("Profession", size="1", color="var(--gray-10)"), + rx.text(m["profession"], size="2"), + flex="2", min_width="200px", + ), + rx.button( + rx.icon("trash-2", size=14), + on_click=ParamsState.remove_mapping(m["prefix"]), + color_scheme="red", variant="ghost", size="1", + ), + gap="0.75rem", align="center", flex_wrap="wrap", + padding="0.4rem 0.6rem", + border="1px solid var(--gray-5)", + border_radius="6px", + background_color="white", + width="100%", + ) + + +def _unmapped_chip(classe: rx.Var) -> rx.Component: + return rx.button( + rx.icon("plus", size=12), + classe, + on_click=ParamsState.quick_add_prefix(classe), + color_scheme="amber", variant="soft", size="1", + ) + + +def _section_profession() -> rx.Component: + return _section( + "Correspondances classe → profession", + rx.text( + "Lors de l'import des données apprentis, la profession est dérivée " + "du préfixe de la classe (ex. classe « AUTOMAT 1 » → profession " + "« Automaticien CFC »). Utilisée notamment dans les avis de retenue.", + size="1", color="var(--gray-11)", + ), + # Tableau des correspondances + rx.cond( + ParamsState.prof_mapping.length() > 0, + rx.vstack( + rx.foreach(ParamsState.prof_mapping, _mapping_row), + spacing="2", width="100%", + ), + rx.text("Aucune correspondance configurée.", size="2", color="var(--gray-10)"), + ), + # Classes orphelines + rx.cond( + ParamsState.prof_unmapped.length() > 0, + rx.box( + rx.text( + "Classes sans correspondance (clique pour ajouter) :", + size="2", weight="medium", color="#92400e", margin_bottom="0.4rem", + ), + rx.flex( + rx.foreach(ParamsState.prof_unmapped, _unmapped_chip), + gap="0.35rem", flex_wrap="wrap", + ), + padding="0.75rem", + background_color="#fef3c7", + border="1px solid #fcd34d", + border_radius="6px", + width="100%", + ), + rx.fragment(), + ), + # Ajout d'une nouvelle correspondance + rx.divider(), + rx.text("Ajouter / modifier une correspondance", size="2", weight="medium"), + rx.flex( + _field( + "Préfixe de classe", + rx.input( + value=ParamsState.new_prefix, + on_change=ParamsState.set_new_prefix, + placeholder="ex. AUTOMAT", + width="100%", + ), + ), + _field( + "Profession", + rx.input( + value=ParamsState.new_profession, + on_change=ParamsState.set_new_profession, + placeholder="ex. Automaticien CFC", + width="100%", + ), + ), + gap="0.75rem", flex_wrap="wrap", width="100%", + ), + rx.flex( + rx.button( + rx.icon("plus", size=16), + "Ajouter / mettre à jour", + on_click=ParamsState.add_mapping, + color_scheme="blue", size="2", + ), + rx.button( + rx.icon("refresh-cw", size=14), + "Appliquer aux fiches existantes", + on_click=ParamsState.apply_mapping_to_db, + color_scheme="gray", variant="soft", size="2", + ), + _save_ok_callout(ParamsState.save_ok_prof), + rx.cond( + ParamsState.refresh_msg != "", + rx.text(ParamsState.refresh_msg, size="1", color="#15803d"), + rx.fragment(), + ), + gap="0.5rem", align="center", flex_wrap="wrap", + ), + ) + + def _section_app() -> rx.Component: return _section( "Application", @@ -482,6 +672,7 @@ def params_page() -> rx.Component: rx.vstack( rx.heading("Paramètres", size="7"), _section_app(), + _section_profession(), _section_sanction(), _section_smtp(), _section_escada(), diff --git a/eptm_dashboard/pages/retenue.py b/eptm_dashboard/pages/retenue.py new file mode 100644 index 0000000..7adfc65 --- /dev/null +++ b/eptm_dashboard/pages/retenue.py @@ -0,0 +1,778 @@ +"""Page /retenue — génération et envoi d'avis de retenue.""" + +from __future__ import annotations + +import json +import os +import sys +from datetime import date as _date +from pathlib import Path +from typing import Optional + +import reflex as rx +from sqlalchemy import select + +_ROOT = Path(__file__).resolve().parent.parent.parent +if str(_ROOT) not in sys.path: + sys.path.insert(0, str(_ROOT)) + +from src.db import get_session, Apprenti, ApprentiFiche, NotesExamen, Notice # noqa: E402 +from src.user_access import get_allowed_classes, is_class_allowed # noqa: E402 +from src.profession import resolve_profession # noqa: E402 +from src.retenue_pdf import generate_retenue_pdf # noqa: E402 +from src.email_sender import send_email # noqa: E402 +from src.logger import app_log # noqa: E402 + +from ..state import AuthState +from ..sidebar import layout +from ..components import empty_state + +DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data"))) +_SETTINGS_FILE = DATA_DIR / "settings.json" + + +def _load_settings() -> dict: + if _SETTINGS_FILE.exists(): + try: + return json.loads(_SETTINGS_FILE.read_text(encoding="utf-8")) + except Exception: + return {} + return {} + + +# ── State ───────────────────────────────────────────────────────────────────── + + +class RetenueState(AuthState): + # Sélecteur apprenti + apprenti_labels: list[str] = [] + apprenti_ids: list[int] = [] + selected_label: str = "" + selected_id: int = 0 + has_apprentis: bool = False + apprenti_search: str = "" + apprenti_select_open: bool = False + + # Données de l'apprenti sélectionné + sel_classe: str = "" + sel_profession: str = "" + sel_fiche_email_appr: str = "" + sel_fiche_email_form: str = "" + sel_fiche_email_entr: str = "" + sel_fiche_nom_entr: str = "" + + # Cache des branches (récupérées des notes d'examen) + branches_cache: list[str] = [] + branche_search: str = "" + branche_open: bool = False + + # Formulaire + retenue_date: str = "" # ISO date "YYYY-MM-DD" + probleme_date: str = "" + case: str = "devoir" # "devoir" | "comportement" | "retard" + branche: str = "" + remarque: str = "" + + # Email + email_dest: str = "apprenti" + email_custom: str = "" + + # Option : créer une notice Escada à la génération + add_notice: bool = False + + # États + form_error: str = "" + + @rx.var + def filtered_apprenti_labels(self) -> list[str]: + q = self.apprenti_search.lower().strip() + if not q: + return self.apprenti_labels + return [l for l in self.apprenti_labels if q in l.lower()] + + @rx.var + def filtered_branches(self) -> list[str]: + q = self.branche_search.lower().strip() + if not q: + return self.branches_cache + return [b for b in self.branches_cache if q in b.lower()] + + # ── Setters ────────────────────────────────────────────────────────────── + def set_apprenti_search(self, v: str): self.apprenti_search = v + def set_apprenti_select_open(self, v: bool): + self.apprenti_select_open = v + if not v: + self.apprenti_search = "" + def set_branche_search(self, v: str): self.branche_search = v + def set_branche_open(self, v: bool): + self.branche_open = v + if not v: + self.branche_search = "" + def set_retenue_date(self, v: str): self.retenue_date = v + def set_probleme_date(self, v: str): self.probleme_date = v + def set_case(self, v: str): self.case = v + def set_branche(self, v: str): self.branche = v + def set_remarque(self, v: str): self.remarque = v + def set_profession(self, v: str): self.sel_profession = v + def set_email_dest(self, v: str): self.email_dest = v + def set_email_custom(self, v: str): self.email_custom = v + def set_add_notice(self, v: bool): self.add_notice = v + + def load_data(self): + if not self.authenticated: + return rx.redirect("/login") + sess = get_session() + try: + allowed = get_allowed_classes(self.username) + q = select(Apprenti).order_by(Apprenti.nom, Apprenti.prenom) + if allowed is not None: + q = q.where(Apprenti.classe.in_(allowed)) + apprentis = sess.execute(q).scalars().all() + if not apprentis: + self.has_apprentis = False + self.apprenti_labels = [] + self.apprenti_ids = [] + return + self.has_apprentis = True + self.apprenti_labels = [ + f"{a.nom} {a.prenom} ({a.classe})" for a in apprentis + ] + self.apprenti_ids = [a.id for a in apprentis] + # Toujours partir d'une sélection vide à l'arrivée sur la page + self.selected_id = 0 + self.selected_label = "" + self.sel_classe = "" + self.sel_profession = "" + self.sel_fiche_email_appr = "" + self.sel_fiche_email_form = "" + self.sel_fiche_email_entr = "" + self.sel_fiche_nom_entr = "" + self._load_branches(sess) + finally: + sess.close() + # Dates par défaut = aujourd'hui + today = _date.today().isoformat() + if not self.retenue_date: + self.retenue_date = today + if not self.probleme_date: + self.probleme_date = today + + def _load_apprenti(self): + if not self.selected_id: + return + sess = get_session() + try: + ap = sess.get(Apprenti, self.selected_id) + if not ap: + return + self.sel_classe = ap.classe + fiche = ap.fiche + if fiche: + self.sel_profession = fiche.profession or resolve_profession(ap.classe) + self.sel_fiche_email_appr = fiche.email or "" + self.sel_fiche_email_form = fiche.formateur_email or "" + self.sel_fiche_email_entr = fiche.entreprise_email or "" + self.sel_fiche_nom_entr = fiche.entreprise_nom or "" + else: + self.sel_profession = resolve_profession(ap.classe) + self.sel_fiche_email_appr = "" + self.sel_fiche_email_form = "" + self.sel_fiche_email_entr = "" + self.sel_fiche_nom_entr = "" + finally: + sess.close() + + def _load_branches(self, sess): + """Construit le cache des branches uniques depuis NotesExamen.""" + rows = sess.execute(select(NotesExamen.donnees_json)).scalars().all() + seen: set[str] = set() + for raw in rows: + try: + d = json.loads(raw) + except Exception: + continue + if isinstance(d, list): + for br in d: + name = (br.get("branche") or "").strip() + if name: + seen.add(name) + self.branches_cache = sorted(seen) + + def handle_select_apprenti(self, label: str): + self.selected_label = label + try: + idx = self.apprenti_labels.index(label) + self.selected_id = self.apprenti_ids[idx] + except ValueError: + pass + self.apprenti_select_open = False + self.apprenti_search = "" + self._load_apprenti() + + def apprenti_search_keydown(self, key: str): + if key == "Enter": + results = self.filtered_apprenti_labels + if results: + return RetenueState.handle_select_apprenti(results[0]) + elif key == "Escape": + self.apprenti_select_open = False + self.apprenti_search = "" + + def select_branche(self, b: str): + self.branche = b + self.branche_open = False + self.branche_search = "" + + def branche_keydown(self, key: str): + if key == "Enter": + # Si une seule branche filtrée : la sélectionne. Sinon prend la saisie libre. + results = self.filtered_branches + if len(results) == 1: + return RetenueState.select_branche(results[0]) + elif self.branche_search: + self.branche = self.branche_search.strip() + self.branche_open = False + self.branche_search = "" + elif key == "Escape": + self.branche_open = False + self.branche_search = "" + + # ── Actions ────────────────────────────────────────────────────────────── + + _CASE_LABELS = { + "devoir": "N'a pas remis ses tâches scolaires dans les délais", + "comportement": "A manifesté un comportement répréhensible", + "retard": "Est arrivé en retard aux cours", + } + + def _build_notice_titre(self) -> str: + label = self._CASE_LABELS.get(self.case, "") + if self.case == "devoir" and self.branche.strip(): + return f"{label} en {self.branche.strip()}" + return label + + def _create_notice_if_requested(self): + """Crée une Notice en DB si la checkbox add_notice est cochée.""" + if not self.add_notice or not self.selected_id: + return + sess = get_session() + try: + sess.add(Notice( + apprenti_id=self.selected_id, + date_event=_date.today(), + titre=self._build_notice_titre(), + remarque=(self.remarque or "").strip() or None, + type_notice=None, + matiere=None, + source="retenue", + status="pending", + created_by=self.username or None, + )) + sess.commit() + app_log( + f"[notice] {self.username or '?'} : création (retenue) pour " + f"{self.selected_label} — case={self.case}" + ) + except Exception as e: + sess.rollback() + app_log(f"[notice] échec création : {e}") + finally: + sess.close() + + def _build_pdf(self) -> Optional[bytes]: + if not self.selected_id: + self.form_error = "Aucun apprenti sélectionné." + return None + if not is_class_allowed(self.username, self.sel_classe): + self.form_error = "Accès refusé pour cette classe." + return None + if self.case == "devoir" and not self.branche.strip(): + self.form_error = "Veuillez préciser la branche." + return None + try: + r_date = _date.fromisoformat(self.retenue_date) + p_date = _date.fromisoformat(self.probleme_date) + except Exception: + self.form_error = "Date invalide." + return None + self.form_error = "" + sess = get_session() + try: + return generate_retenue_pdf( + sess, self.selected_id, + profession=self.sel_profession, + retenue_date=r_date, + probleme_date=p_date, + case=self.case, + branche=self.branche.strip(), + remarque=self.remarque, + prof_name=self.name or self.username, + ) + finally: + sess.close() + + def _filename(self) -> str: + sess = get_session() + try: + ap = sess.get(Apprenti, self.selected_id) + if not ap: + return "Avis_retenue.pdf" + safe_nom = "".join(c if c.isalnum() else "_" for c in ap.nom) + safe_prenom = "".join(c if c.isalnum() else "_" for c in ap.prenom) + return f"Avis_retenue_{safe_nom}_{safe_prenom}.pdf" + finally: + sess.close() + + def download_pdf(self): + data = self._build_pdf() + if data is None: + if self.form_error: + return rx.toast.error(self.form_error) + return rx.toast.error("Impossible de générer le PDF.") + app_log( + f"[retenue] {self.username or '?'} : avis téléchargé pour " + f"{self.selected_label} (case={self.case})" + ) + self._create_notice_if_requested() + return rx.download(data=data, filename=self._filename()) + + def send_email_action(self): + data = self._build_pdf() + if data is None: + if self.form_error: + return rx.toast.error(self.form_error) + return rx.toast.error("Impossible de générer le PDF.") + + # Destinataire + if self.email_dest == "apprenti": + to = self.sel_fiche_email_appr + elif self.email_dest == "formateur": + to = self.sel_fiche_email_form + else: + to = self.email_custom.strip() + if not to or "@" not in to: + return rx.toast.error("Adresse email invalide ou manquante.") + + s = _load_settings() + smtp_host = s.get("smtp_host") + smtp_port = int(s.get("smtp_port") or 587) + smtp_login = s.get("smtp_login") + smtp_password = s.get("smtp_password") + smtp_sender = s.get("smtp_sender") + if not (smtp_host and smtp_login and smtp_password and smtp_sender): + return rx.toast.error("Configuration SMTP incomplète (Paramètres).") + + subject = f"Avis de retenue — {self.selected_label}" + body = ( + f"Bonjour,\n\nVeuillez trouver en pièce jointe l'avis de retenue concernant " + f"{self.selected_label}.\n\nCordialement,\n{self.name or self.username}\n" + ) + try: + send_email( + smtp_host=smtp_host, smtp_port=smtp_port, + smtp_login=smtp_login, smtp_password=smtp_password, + smtp_sender=smtp_sender, + to_email=to, subject=subject, body=body, + attachments=[(data, self._filename())], + ) + except Exception as e: + return rx.toast.error(f"Échec d'envoi : {e}") + app_log( + f"[retenue] {self.username or '?'} : avis envoyé à {to} pour " + f"{self.selected_label}" + ) + self._create_notice_if_requested() + return rx.toast.success(f"Avis envoyé à {to}") + + +# ── UI ──────────────────────────────────────────────────────────────────────── + +def _apprenti_option(label: rx.Var) -> rx.Component: + return rx.box( + rx.text(label, size="2"), + padding="0.45rem 0.75rem", + cursor="pointer", + on_click=RetenueState.handle_select_apprenti(label), + _hover={"background_color": "var(--gray-3)"}, + width="100%", + ) + + +def _apprenti_selector() -> rx.Component: + return rx.popover.root( + rx.popover.trigger( + rx.box( + rx.flex( + rx.cond( + RetenueState.selected_label != "", + rx.text(RetenueState.selected_label, size="2"), + rx.text("Sélectionner un apprenti…", size="2", color="var(--gray-9)"), + ), + rx.spacer(), + rx.icon("chevron-down", size=18, color="var(--gray-9)"), + align="center", + width="100%", + ), + padding="0.5rem 0.75rem", + border="1px solid var(--gray-7)", + border_radius="6px", + background_color="white", + cursor="pointer", + width="100%", + custom_attrs={"data-shortcut": "apprenti-search"}, + ), + ), + rx.popover.content( + rx.vstack( + rx.input( + placeholder="Rechercher un apprenti…", + value=RetenueState.apprenti_search, + on_change=RetenueState.set_apprenti_search, + on_key_down=RetenueState.apprenti_search_keydown, + size="2", + width="100%", + auto_focus=True, + ), + rx.cond( + RetenueState.filtered_apprenti_labels.length() > 0, + rx.box( + rx.foreach(RetenueState.filtered_apprenti_labels, _apprenti_option), + max_height="280px", + overflow_y="auto", + width="100%", + ), + rx.box( + rx.text("Aucun résultat", size="2", color="var(--gray-9)"), + padding="0.5rem 0.75rem", + ), + ), + spacing="2", + width="100%", + ), + min_width="320px", + max_width="500px", + padding="0.5rem", + ), + open=RetenueState.apprenti_select_open, + on_open_change=RetenueState.set_apprenti_select_open, + ) + + +def _branche_option(b: rx.Var) -> rx.Component: + return rx.box( + rx.text(b, size="2"), + padding="0.45rem 0.75rem", + cursor="pointer", + on_click=RetenueState.select_branche(b), + _hover={"background_color": "var(--gray-3)"}, + width="100%", + ) + + +def _branche_selector() -> rx.Component: + return rx.popover.root( + rx.popover.trigger( + rx.box( + rx.flex( + rx.cond( + RetenueState.branche != "", + rx.text(RetenueState.branche, size="2"), + rx.text("Choisir / taper une branche…", size="2", color="var(--gray-9)"), + ), + rx.spacer(), + rx.icon("chevron-down", size=18, color="var(--gray-9)"), + align="center", + width="100%", + ), + padding="0.5rem 0.75rem", + border="1px solid var(--gray-7)", + border_radius="6px", + background_color="white", + cursor="pointer", + width="100%", + ), + ), + rx.popover.content( + rx.vstack( + rx.input( + placeholder="Rechercher ou saisir une branche libre…", + value=RetenueState.branche_search, + on_change=RetenueState.set_branche_search, + on_key_down=RetenueState.branche_keydown, + size="2", + width="100%", + auto_focus=True, + ), + rx.cond( + RetenueState.filtered_branches.length() > 0, + rx.box( + rx.foreach(RetenueState.filtered_branches, _branche_option), + max_height="280px", + overflow_y="auto", + width="100%", + ), + rx.text( + "Appuyez sur Entrée pour valider votre saisie libre.", + size="1", color="var(--gray-9)", + padding="0.5rem 0.75rem", + ), + ), + spacing="2", + width="100%", + ), + min_width="320px", + max_width="500px", + padding="0.5rem", + ), + open=RetenueState.branche_open, + on_open_change=RetenueState.set_branche_open, + ) + + +def _profession_warning() -> rx.Component: + # Affiché uniquement si un apprenti est sélectionné ET que sa profession est vide + return rx.cond( + (RetenueState.selected_id != 0) & (RetenueState.sel_profession == ""), + rx.callout.root( + rx.callout.icon(rx.icon("triangle-alert", size=16)), + rx.callout.text( + "Profession non définie pour ", + RetenueState.sel_classe, + ". Renseigne-la ci-dessous, ou ajoute la correspondance dans ", + rx.link("Paramètres", href="/params", color="#1565c0"), + " pour qu'elle soit pré-remplie automatiquement.", + ), + color_scheme="amber", variant="soft", size="1", + ), + rx.fragment(), + ) + + +def _form() -> rx.Component: + return rx.vstack( + # Apprenti + rx.vstack( + rx.text("Apprenti", size="2", weight="medium", color="var(--gray-11)"), + _apprenti_selector(), + spacing="1", width="100%", + ), + _profession_warning(), + # Profession (éditable) + rx.vstack( + rx.text("Profession", size="2", weight="medium", color="var(--gray-11)"), + rx.input( + value=RetenueState.sel_profession, + on_change=RetenueState.set_profession, + placeholder="ex. Automaticien CFC", + width="100%", + ), + spacing="1", width="100%", + ), + # Dates + rx.flex( + rx.vstack( + rx.text("Date de retenue", size="2", weight="medium", color="var(--gray-11)"), + rx.input( + type="date", + value=RetenueState.retenue_date, + on_change=RetenueState.set_retenue_date, + width="100%", + ), + spacing="1", flex="1", min_width="200px", + ), + rx.vstack( + rx.text("Date du problème", size="2", weight="medium", color="var(--gray-11)"), + rx.input( + type="date", + value=RetenueState.probleme_date, + on_change=RetenueState.set_probleme_date, + width="100%", + ), + spacing="1", flex="1", min_width="200px", + ), + gap="0.75rem", flex_wrap="wrap", width="100%", + ), + # Motif (radio) + rx.vstack( + rx.text("Motif de la retenue", size="2", weight="medium", color="var(--gray-11)"), + rx.radio_group.root( + rx.vstack( + rx.radio_group.item( + rx.text("N'a pas remis ses tâches scolaires dans les délais", size="2"), + value="devoir", + ), + rx.radio_group.item( + rx.text("A manifesté un comportement répréhensible", size="2"), + value="comportement", + ), + rx.radio_group.item( + rx.text("Est arrivé en retard aux cours", size="2"), + value="retard", + ), + spacing="2", + ), + value=RetenueState.case, + on_change=RetenueState.set_case, + ), + spacing="2", width="100%", + ), + # Branche (visible seulement si case devoir) + rx.cond( + RetenueState.case == "devoir", + rx.vstack( + rx.text("Branche", size="2", weight="medium", color="var(--gray-11)"), + _branche_selector(), + spacing="1", width="100%", + ), + rx.fragment(), + ), + # Remarque + rx.vstack( + rx.text("Remarque éventuelle de l'école", size="2", weight="medium", color="var(--gray-11)"), + rx.text_area( + value=RetenueState.remarque, + on_change=RetenueState.set_remarque, + rows="4", + width="100%", + resize="vertical", + ), + spacing="1", width="100%", + ), + # Erreur + rx.cond( + RetenueState.form_error != "", + rx.callout.root( + rx.callout.icon(rx.icon("triangle-alert", size=16)), + rx.callout.text(RetenueState.form_error), + color_scheme="red", variant="soft", size="1", + ), + rx.fragment(), + ), + # Option : créer une notice Escada + rx.flex( + rx.checkbox( + checked=RetenueState.add_notice, + on_change=RetenueState.set_add_notice, + size="2", + ), + rx.text( + "Ajouter automatiquement une notice sur Escada", + size="2", color="var(--gray-12)", + ), + gap="0.5rem", align="center", + padding="0.5rem 0.65rem", + background_color="#f8f9fa", + border="1px solid #e5e7eb", + border_radius="6px", + ), + # Actions : télécharger + rx.button( + rx.icon("file-down", size=16), + "Télécharger l'avis", + on_click=RetenueState.download_pdf, + color_scheme="red", size="2", + disabled=RetenueState.selected_id == 0, + ), + spacing="4", + width="100%", + ) + + +def _email_section() -> rx.Component: + return rx.box( + rx.vstack( + rx.flex( + rx.icon("mail", size=16, color="#37474f"), + rx.text("Envoyer par email", size="3", weight="bold", color="#37474f"), + gap="0.5rem", align="center", + ), + rx.divider(), + rx.text("Destinataire", size="2", weight="medium", color="var(--gray-11)"), + rx.radio_group.root( + rx.vstack( + rx.radio_group.item( + rx.cond( + RetenueState.sel_fiche_email_appr != "", + rx.text("Apprenti — ", RetenueState.sel_fiche_email_appr, size="2"), + rx.text("Apprenti (email inconnu)", size="2", color="var(--gray-9)"), + ), + value="apprenti", + disabled=RetenueState.sel_fiche_email_appr == "", + ), + rx.radio_group.item( + rx.cond( + RetenueState.sel_fiche_email_form != "", + rx.text("Formateur — ", RetenueState.sel_fiche_email_form, size="2"), + rx.text("Formateur (email inconnu)", size="2", color="var(--gray-9)"), + ), + value="formateur", + disabled=RetenueState.sel_fiche_email_form == "", + ), + rx.radio_group.item( + rx.text("Autre adresse", size="2"), + value="autre", + ), + spacing="2", + ), + value=RetenueState.email_dest, + on_change=RetenueState.set_email_dest, + ), + rx.cond( + RetenueState.email_dest == "autre", + rx.input( + placeholder="email@domaine.ch", + value=RetenueState.email_custom, + on_change=RetenueState.set_email_custom, + type="email", + width="100%", + ), + rx.fragment(), + ), + rx.button( + rx.icon("send", size=16), + "Envoyer l'avis par email", + on_click=RetenueState.send_email_action, + color_scheme="blue", size="2", + disabled=RetenueState.selected_id == 0, + ), + spacing="3", width="100%", + ), + padding="1.25rem", + background_color="white", + border_radius="8px", + border="1px solid #e0e0e0", + width="100%", + ) + + +def retenue_page() -> rx.Component: + return layout( + rx.vstack( + rx.heading("Avis de retenue", size="6"), + rx.cond( + RetenueState.has_apprentis, + rx.vstack( + rx.box( + _form(), + padding="1.25rem", + background_color="white", + border_radius="8px", + border="1px solid #e0e0e0", + width="100%", + ), + _email_section(), + spacing="4", width="100%", + ), + empty_state( + icon="users", + title="Aucun apprenti", + description="Importe les classes depuis Escadaweb pour générer des avis.", + action_label="Lancer un import", + action_href="/escada", + ), + ), + spacing="4", + width="100%", + max_width="780px", + ) + ) diff --git a/eptm_dashboard/sidebar.py b/eptm_dashboard/sidebar.py index dcb421b..ee15e55 100644 --- a/eptm_dashboard/sidebar.py +++ b/eptm_dashboard/sidebar.py @@ -24,6 +24,7 @@ _PAGES = [ ("Tableau de bord", "/accueil", "layout-dashboard"), ("Apprentis", "/fiche", "user"), ("Classes", "/classe", "users"), + ("Avis de retenue", "/retenue", "file-warning"), ] _ADMIN_PAGES = [ diff --git a/requirements.txt b/requirements.txt index 16a3b1c..008a68a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ reflex==0.9.2 -markdown==3.10.2 \ No newline at end of file +markdown==3.10.2 +pikepdf==10.5.1 \ No newline at end of file diff --git a/scripts/cron_tick.py b/scripts/cron_tick.py index 31a804a..edaa1fd 100755 --- a/scripts/cron_tick.py +++ b/scripts/cron_tick.py @@ -52,6 +52,7 @@ except Exception: SCRIPT_SYNC = _ROOT / "scripts" / "sync_esacada.py" SCRIPT_PUSH = _ROOT / "scripts" / "push_to_escada.py" +SCRIPT_PUSH_NOTICES = _ROOT / "scripts" / "push_notices.py" DATA_DIR = _ROOT / "data" # Marqueur écrit par run_imports.py à la fin des imports en DB @@ -327,6 +328,8 @@ def run_job(job: CronJob, sess) -> None: ("Push Escada", _build_push_cmd(job)), ("Sync Escada", _build_sync_cmd(job)), ] + elif job.task_kind == "push_notices": + steps = [("Push notices", [sys.executable, str(SCRIPT_PUSH_NOTICES)])] else: fp.write(f"[error] task_kind inconnu : {job.task_kind}\n") overall_rc = 99 diff --git a/scripts/push_notices.py b/scripts/push_notices.py new file mode 100755 index 0000000..4973bc0 --- /dev/null +++ b/scripts/push_notices.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +"""Push des notices en attente vers Escadaweb. + +Workflow par notice : + Classes → Élèves (de la classe) → Notices (de l'apprenti) → Ajouter + → Date / Titre / Remarques → Mettre à jour → retour Élèves + +Réutilise les helpers de `sync_esacada.py` : + - `_launch_context()` : navigateur headless avec profil persistant + - `_ensure_logged_in(page)` : login SSO + 2FA + langue FR + - `_go_to_students_page(page, class_name)` : ouvre ViewLernende d'une classe + +Sortie standard (parsée par `cron_tick.py` et la page /escada) : + PUSH_NOTICES_DONE {"ok": N, "err": [...], "remaining": N} + +Behaviour DB : + - status='pending' → tentative + - succès → suppression de la Notice de la DB + - échec → status='failed' + error_msg +""" + +from __future__ import annotations + +import json +import sys +import traceback +from pathlib import Path + +_ROOT = Path(__file__).resolve().parent.parent +if str(_ROOT) not in sys.path: + sys.path.insert(0, str(_ROOT)) + +from sqlalchemy import select # noqa: E402 +from playwright.sync_api import Page # noqa: E402 + +from src.db import get_session, Notice # noqa: E402 +from src.logger import app_log # noqa: E402 + +from scripts.sync_esacada import ( # noqa: E402 + _launch_context, _ensure_logged_in, _go_to_students_page, _log, + CLASSES_URL, +) + + +def _fill_date(page: Page, date_str: str) -> None: + """Remplit le champ Date du formulaire notice (DevExpress). + + On vise l'input texte directement (`id$="_DXEditor1_I"`), plus stable que + le calendrier popup. + """ + date_input = page.locator("input[id$='_DXEditor1_I']").first + date_input.wait_for(state="visible", timeout=10_000) + date_input.click() + # Sélectionne tout l'ancien contenu (date pré-remplie d'aujourd'hui) puis tape + date_input.press("Control+A") + date_input.type(date_str) + date_input.press("Tab") # commit la valeur + + +def _push_one_notice(page: Page, notice: Notice, students_url: str) -> tuple[bool, str]: + """Pousse une notice. Renvoie (ok, error_message). + + Pré : `page` est sur la liste Élèves de la classe de l'apprenti. + Post (succès ou échec) : `page` est de retour sur la liste Élèves. + """ + ap = notice.apprenti + nom = ap.nom + prenom = ap.prenom + + # 1. Trouver la ligne de l'apprenti et cliquer "Notices" + try: + # On filtre par nom ET prénom pour éviter les homonymes + student_row = page.locator("tr").filter(has_text=nom).filter(has_text=prenom).first + if not student_row.count(): + return False, f"Apprenti '{nom} {prenom}' introuvable dans la grille" + student_row.get_by_role("link", name="Notices").first.click() + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception as e: + return False, f"Navigation Notices : {e}" + + # 2. Cliquer "Ajouter" + try: + page.locator("a").filter(has_text="Ajouter").first.click() + page.wait_for_timeout(800) + except Exception as e: + return False, f"Bouton Ajouter introuvable : {e}" + + # 3. Remplir Date / Titre / Remarques + try: + _fill_date(page, notice.date_event.strftime("%d.%m.%Y")) + except Exception as e: + return False, f"Remplissage date : {e}" + + try: + page.get_by_role("textbox", name="Titre:").fill(notice.titre) + except Exception as e: + return False, f"Remplissage titre : {e}" + + if notice.remarque: + try: + page.get_by_role("textbox", name="Remarques:").fill(notice.remarque) + except Exception: + pass # Non bloquant + + # 4. Sauver + try: + page.get_by_role("link", name="Mettre à jour").click() + page.wait_for_load_state("networkidle", timeout=15_000) + page.wait_for_timeout(500) # laisse le temps à la grille de se rafraîchir + except Exception as e: + return False, f"Échec Mettre à jour : {e}" + + # 5. Vérifier que la notice est bien dans la grille de l'apprenti + try: + # On cherche le titre dans la grille des notices (max 30 chars pour éviter + # les soucis de longueur / wrapping). + needle = (notice.titre or "").strip()[:30] + if needle: + cell = page.locator("td").filter(has_text=needle).first + cell.wait_for(state="visible", timeout=8_000) + except Exception: + # Vérification échouée — on retourne quand même à la liste Élèves + # avant de signaler l'échec. + try: + page.goto(students_url) + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + pass + return False, "Notice non retrouvée dans la grille après save (échec probable)" + + # 6. Retour à la liste Élèves de la même classe (option a : navigation directe) + try: + page.goto(students_url) + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception as e: + return False, f"Retour grille élèves : {e}" + + return True, "" + + +def main(): + sess = get_session() + ok_count = 0 + errors: list[str] = [] + try: + notices = sess.execute( + select(Notice).where(Notice.status == "pending") + ).scalars().all() + + app_log(f"[push_notices] {len(notices)} notice(s) en attente") + + if not notices: + print( + 'PUSH_NOTICES_DONE ' + + json.dumps({"ok": 0, "err": [], "remaining": 0}), + flush=True, + ) + return + + # Groupe par classe pour minimiser les navigations + by_class: dict[str, list[Notice]] = {} + for n in notices: + by_class.setdefault(n.apprenti.classe, []).append(n) + + pw, ctx, page = _launch_context() + try: + # Navigation initiale vers ViewKlassen : redirige vers le login + # si la session est expirée, et permet à _ensure_logged_in + # de détecter le succès (ViewKlassen dans l'URL). + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + for classe, class_notices in by_class.items(): + _log(f"[push_notices] classe={classe} ({len(class_notices)} notices)") + try: + students_page = _go_to_students_page(page, classe) + except Exception as e: + students_page = None + _log(f"[push_notices] erreur navigation {classe}: {e}") + if not students_page: + msg = f"classe '{classe}' introuvable sur Escada" + for n in class_notices: + n.status = "failed" + n.error_msg = msg + errors.append( + f"id={n.id} ({n.apprenti.nom} {n.apprenti.prenom}): {msg}" + ) + sess.commit() + continue + + students_url = students_page.url + + for notice in class_notices: + label = f"{notice.apprenti.nom} {notice.apprenti.prenom}" + try: + ok, err = _push_one_notice(students_page, notice, students_url) + if ok: + sess.delete(notice) + sess.commit() + ok_count += 1 + _log(f"[push_notices] OK id={notice.id} ({label})") + else: + notice.status = "failed" + notice.error_msg = err[:500] + sess.commit() + errors.append(f"id={notice.id} ({label}): {err}") + _log(f"[push_notices] FAIL id={notice.id}: {err}") + # Si on est paumé, tenter un retour propre + try: + students_page.goto(students_url) + students_page.wait_for_load_state( + "networkidle", timeout=10_000 + ) + except Exception: + break # impossible de recover, on passe à la classe suivante + except Exception as e: + notice.status = "failed" + notice.error_msg = str(e)[:500] + sess.commit() + errors.append(f"id={notice.id} ({label}): {e}") + _log(f"[push_notices] EX id={notice.id}: {e}\n{traceback.format_exc()}") + finally: + try: ctx.close() + except Exception: pass + try: pw.stop() + except Exception: pass + + finally: + # Compte les notices encore pending (n'incluant pas les "failed") + try: + remaining = sess.execute( + select(Notice).where(Notice.status == "pending") + ).all() + remaining_count = len(remaining) + except Exception: + remaining_count = 0 + sess.close() + + print( + 'PUSH_NOTICES_DONE ' + + json.dumps({ + "ok": ok_count, + "err": errors, + "remaining": remaining_count, + }, ensure_ascii=False), + flush=True, + ) + + +if __name__ == "__main__": + main() diff --git a/src/db.py b/src/db.py index 6918c5e..1bf7a4c 100644 --- a/src/db.py +++ b/src/db.py @@ -189,6 +189,9 @@ class ApprentiFiche(Base): formateur_nom: Mapped[Optional[str]] = mapped_column(String, nullable=True) formateur_email: Mapped[Optional[str]] = mapped_column(String, nullable=True) + # Profession dérivée du préfixe de classe (mapping dans data/settings.json) + profession: Mapped[Optional[str]] = mapped_column(String, nullable=True) + updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now) apprenti: Mapped["Apprenti"] = relationship(back_populates="fiche") @@ -206,6 +209,30 @@ class NotesExamen(Base): apprenti: Mapped["Apprenti"] = relationship(back_populates="notes_examen") +class Notice(Base): + """Note à pousser sur Escada (liée à un apprenti). + + Créée notamment lors de la génération d'un avis de retenue (si la case + correspondante est cochée). Supprimée après push réussi. + """ + __tablename__ = "notices" + + id: Mapped[int] = mapped_column(primary_key=True) + apprenti_id: Mapped[int] = mapped_column(ForeignKey("apprentis.id")) + date_event: Mapped[date] + titre: Mapped[str] = mapped_column(Text) + remarque: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + type_notice: Mapped[Optional[str]] = mapped_column(String, nullable=True) + matiere: Mapped[Optional[str]] = mapped_column(String, nullable=True) + source: Mapped[str] = mapped_column(default="manual") # "retenue" pour le moment + status: Mapped[str] = mapped_column(default="pending") # "pending" | "failed" + created_at: Mapped[datetime] = mapped_column(default=datetime.now) + created_by: Mapped[Optional[str]] = mapped_column(String, nullable=True) + error_msg: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + apprenti: Mapped["Apprenti"] = relationship() + + class SanctionExport(Base): __tablename__ = "sanctions_export" @@ -286,6 +313,21 @@ def init_db(engine=None): for stmt in ( "ALTER TABLE sanctions_export ADD COLUMN nb_absences INTEGER", "ALTER TABLE cron_jobs ADD COLUMN notify_level TEXT DEFAULT 'normal'", + "ALTER TABLE apprenti_fiches ADD COLUMN profession TEXT", + """CREATE TABLE IF NOT EXISTS notices ( + id INTEGER PRIMARY KEY, + apprenti_id INTEGER NOT NULL REFERENCES apprentis(id), + date_event DATE NOT NULL, + titre TEXT NOT NULL, + remarque TEXT, + type_notice TEXT, + matiere TEXT, + source TEXT NOT NULL DEFAULT 'manual', + status TEXT NOT NULL DEFAULT 'pending', + created_at DATETIME NOT NULL, + created_by TEXT, + error_msg TEXT + )""", """CREATE TABLE IF NOT EXISTS escada_pending ( id INTEGER PRIMARY KEY, apprenti_id INTEGER NOT NULL REFERENCES apprentis(id), @@ -315,6 +357,7 @@ def upsert_apprenti_fiche(session: Session, apprenti_id: int, data: dict) -> Non "entreprise_nom", "entreprise_adresse", "entreprise_code_postal", "entreprise_localite", "entreprise_telephone", "entreprise_email", "formateur_nom", "formateur_email", + "profession", ] if existing: for f in fields: diff --git a/src/profession.py b/src/profession.py new file mode 100644 index 0000000..19b5c26 --- /dev/null +++ b/src/profession.py @@ -0,0 +1,113 @@ +"""Helper pour la résolution `classe → profession` via mapping configurable. + +Mapping stocké dans `data/settings.json` sous la clé `class_profession_mapping`, +forme : `[{"prefix": "AUTOMAT", "profession": "Automaticien CFC"}, ...]`. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Optional + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from src.db import Apprenti, ApprentiFiche, upsert_apprenti_fiche + +_ROOT = Path(__file__).resolve().parent.parent +_DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data"))) +_SETTINGS_PATH = _DATA_DIR / "settings.json" + +_DEFAULT_MAPPING = [ + {"prefix": "AUTOMAT", "profession": "Automaticien CFC"}, + {"prefix": "MONTAUT", "profession": "Monteur Automaticien CFC"}, + {"prefix": "EM-AU", "profession": "Automaticien CFC"}, +] + + +def _load_settings() -> dict: + if _SETTINGS_PATH.exists(): + try: + return json.loads(_SETTINGS_PATH.read_text(encoding="utf-8")) + except Exception: + return {} + return {} + + +def _save_settings(s: dict) -> None: + _SETTINGS_PATH.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding="utf-8") + + +def load_mapping() -> list[dict]: + """Renvoie la liste des correspondances [{prefix, profession}, ...].""" + s = _load_settings() + return list(s.get("class_profession_mapping", _DEFAULT_MAPPING)) + + +def save_mapping(mapping: list[dict]) -> None: + """Sauve le mapping (filtre les entrées vides).""" + cleaned = [ + {"prefix": (m.get("prefix") or "").strip(), "profession": (m.get("profession") or "").strip()} + for m in mapping + ] + cleaned = [m for m in cleaned if m["prefix"] and m["profession"]] + s = _load_settings() + s["class_profession_mapping"] = cleaned + _save_settings(s) + + +def resolve_profession(classe: str, mapping: Optional[list[dict]] = None) -> str: + """Renvoie la profession matchant le préfixe de la classe, ou '' si aucun.""" + if not classe: + return "" + if mapping is None: + mapping = load_mapping() + # On préfère le préfixe le plus long en cas de chevauchement + for entry in sorted(mapping, key=lambda m: -len(m.get("prefix", ""))): + prefix = entry.get("prefix", "") + if prefix and classe.startswith(prefix): + return entry.get("profession", "") + return "" + + +def find_unmapped_classes(session: Session) -> list[str]: + """Liste les classes en DB sans correspondance dans le mapping. + + Exclut MP/MI (déjà filtrées partout dans l'app). + """ + mapping = load_mapping() + classes = session.execute( + select(Apprenti.classe).distinct().order_by(Apprenti.classe) + ).scalars().all() + out = [] + for c in classes: + if not c or c.startswith(("MP", "MI")): + continue + if not resolve_profession(c, mapping): + out.append(c) + return out + + +def refresh_all_professions(session: Session) -> int: + """Recalcule `profession` pour tous les apprentis en base. + + Renvoie le nombre de fiches mises à jour. Utile : + - une fois à l'init après ajout du champ + - après modification du mapping dans Paramètres + - après une sync Escada + """ + mapping = load_mapping() + apprentis = session.execute(select(Apprenti)).scalars().all() + n = 0 + for ap in apprentis: + prof = resolve_profession(ap.classe, mapping) + if not prof: + # Pas de mapping → on laisse la valeur existante si présente + continue + # upsert : crée la fiche si elle n'existe pas, sinon met à jour profession + upsert_apprenti_fiche(session, ap.id, {"profession": prof}) + n += 1 + session.commit() + return n diff --git a/src/retenue_pdf.py b/src/retenue_pdf.py new file mode 100644 index 0000000..1c7bb73 --- /dev/null +++ b/src/retenue_pdf.py @@ -0,0 +1,231 @@ +"""Génération d'avis de retenue à partir du template AcroForm. + +Template : `data/templates/GF_FO_Avis_de_retenue.pdf`. Le champ `Date` du +template a 3 widgets-enfants partagés (un par ligne du formulaire). On les +sépare en 3 champs distincts (`Date_devoir`, `Date_comportement`, `Date_retard`) +puis on remplit uniquement celui correspondant à la case cochée. + +Le PDF généré reste éditable (formulaire préservé). +""" + +from __future__ import annotations + +import io +import os +from datetime import date as _date +from pathlib import Path +from typing import Optional + +import pypdf + +from sqlalchemy.orm import Session + +from src.db import Apprenti, ApprentiFiche + +_ROOT = Path(__file__).resolve().parent.parent +_DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data"))) +_TEMPLATE_PATH = _DATA_DIR / "templates" / "GF_FO_Avis_de_retenue.pdf" + +_MOIS_FR = [ + "janvier", "février", "mars", "avril", "mai", "juin", + "juillet", "août", "septembre", "octobre", "novembre", "décembre", +] + +# Mapping case → suffixe + index (ordre des widgets enfants triés par Y desc) +_CASE_TO_SUFFIX = { + "devoir": ("Date_devoir", 0), + "comportement": ("Date_comportement", 1), + "retard": ("Date_retard", 2), +} + + +def format_date_long(d: _date) -> str: + """Formate une date en 'jour mois année' (ex: '12 mars 2026').""" + return f"{d.day} {_MOIS_FR[d.month - 1]} {d.year}" + + +def generate_retenue_pdf( + sess: Session, + apprenti_id: int, + *, + profession: str, + retenue_date: _date, + probleme_date: _date, + case: str, # "devoir" | "comportement" | "retard" + branche: str = "", + remarque: str = "", + prof_name: str = "", +) -> Optional[bytes]: + """Pré-remplit le template puis aplatit le PDF. Renvoie les bytes du PDF aplati.""" + if not _TEMPLATE_PATH.exists(): + return None + apprenti = sess.get(Apprenti, apprenti_id) + if apprenti is None: + return None + + fiche: Optional[ApprentiFiche] = apprenti.fiche + + classe_full = ( + f"{profession.strip()} {apprenti.classe}".strip() + if profession else apprenti.classe + ) + npa_ville = "" + if fiche: + cp = (fiche.entreprise_code_postal or "").strip() + loc = (fiche.entreprise_localite or "").strip() + npa_ville = f"{cp} {loc}".strip() + + # 1. Lecture template + clone + reader = pypdf.PdfReader(str(_TEMPLATE_PATH)) + writer = pypdf.PdfWriter(clone_from=reader) + + # 2. Séparer les 3 widgets du champ Date en 3 champs distincts. + # Après cette opération, on peut remplir chaque Date_xxx individuellement. + _split_date_field(writer) + + # 3. Remplit les champs texte (Date_xxx inclus pour la case sélectionnée) + target_date_field = _CASE_TO_SUFFIX.get(case, (None, None))[0] + field_values: dict[str, str] = { + "NomApprenti": f"{apprenti.prenom} {apprenti.nom}".strip(), + "Classe": classe_full, + "NomEntreprise": (fiche.entreprise_nom if fiche else "") or "", + "Adresse": (fiche.entreprise_adresse if fiche else "") or "", + "NPA-Ville": npa_ville, + "RetenueDateHeure": retenue_date.strftime("%d.%m.%Y"), + "Branche": branche if case == "devoir" else "", + "Remarque": remarque, + "DateAvis": format_date_long(_date.today()), + "Profs": prof_name or "", + } + if target_date_field: + field_values[target_date_field] = probleme_date.strftime("%d.%m.%Y") + + for page in writer.pages: + try: + writer.update_page_form_field_values( + page, field_values, auto_regenerate=False, + ) + except Exception: + pass + + # 4. Checkboxes + case_to_field = { + "devoir": "CaseDevoir", + "comportement": "CaseComportement", + "retard": "CaseRetard", + } + target_check = case_to_field.get(case) + for fname in case_to_field.values(): + try: + _set_checkbox(writer, fname, fname == target_check) + except Exception: + pass + + # 5. Force NeedAppearances pour que les viewers redessinent les valeurs + try: + root = writer._root_object + if "/AcroForm" in root: + root["/AcroForm"].update({ + pypdf.generic.NameObject("/NeedAppearances"): + pypdf.generic.BooleanObject(True) + }) + except Exception: + pass + + # 6. Écriture (formulaire préservé éditable) + buf = io.BytesIO() + writer.write(buf) + return buf.getvalue() + + +def _split_date_field(writer: pypdf.PdfWriter) -> None: + """Sépare le champ `Date` (avec 3 widgets enfants) en 3 champs indépendants. + + Renomme les widgets selon leur position Y (ordre du haut vers le bas) : + kid #0 (haut) → Date_devoir + kid #1 (milieu) → Date_comportement + kid #2 (bas) → Date_retard + """ + NameObject = pypdf.generic.NameObject + acroform_ref = writer._root_object.get("/AcroForm") + if not acroform_ref: + return + acroform = acroform_ref.get_object() if hasattr(acroform_ref, "get_object") else acroform_ref + fields = acroform.get("/Fields") or [] + date_field = None + date_ref = None + for f in fields: + if f.get_object().get("/T") == "Date": + date_field = f.get_object() + date_ref = f + break + if date_field is None: + return + kids = date_field.get("/Kids") or [] + if not kids: + return + + # Trier les enfants par Y descendant + indexed = [] + for kid in kids: + ko = kid.get_object() + rect = ko.get("/Rect") + y = float(rect[1]) if rect else 0.0 + indexed.append((y, kid, ko)) + indexed.sort(key=lambda t: -t[0]) + + # Promouvoir chaque enfant en champ indépendant + new_fields = [] + suffixes_by_order = ["Date_devoir", "Date_comportement", "Date_retard"] + for i, (_y, kid_ref, kid_obj) in enumerate(indexed): + # Renomme : donne un /T propre à l'ancien widget enfant + kid_obj[NameObject("/T")] = pypdf.generic.create_string_object( + suffixes_by_order[i] + ) + # Hériter du /FT, /DA, /Q du parent si manquant sur l'enfant + for prop in ("/FT", "/DA", "/Q"): + if prop not in kid_obj and prop in date_field: + kid_obj[NameObject(prop)] = date_field[prop] + # Détacher du parent + if "/Parent" in kid_obj: + del kid_obj[NameObject("/Parent")] + new_fields.append(kid_ref) + + # Retirer l'ancien champ Date de /Fields, ajouter les 3 nouveaux + new_field_list = [f for f in fields if f is not date_ref] + new_fields + acroform[NameObject("/Fields")] = pypdf.generic.ArrayObject(new_field_list) + + +def _find_field(writer: pypdf.PdfWriter, name: str): + acroform = writer._root_object.get("/AcroForm") + if not acroform: + return None + for f in acroform.get("/Fields") or []: + obj = f.get_object() + if obj.get("/T") == name: + return obj + return None + + +def _set_checkbox(writer: pypdf.PdfWriter, field_name: str, checked: bool) -> None: + """Coche/décoche une checkbox AcroForm, gère les widgets enfants sans /T.""" + NameObject = pypdf.generic.NameObject + field = _find_field(writer, field_name) + if field is None: + return + kids = field.get("/Kids") + widgets = [k.get_object() for k in kids] if kids else [field] + on_value = "/Yes" + for widget in widgets: + ap = widget.get("/AP") or field.get("/AP") + if ap is not None: + n_ap = ap.get("/N") if hasattr(ap, "get") else None + if n_ap is not None: + for k in n_ap.keys(): + ks = str(k) + if ks not in ("/Off", "Off"): + on_value = ks if ks.startswith("/") else f"/{ks}" + break + new_val = NameObject(on_value if checked else "/Off") + widget[NameObject("/AS")] = new_val + field[NameObject("/V")] = NameObject(on_value if checked else "/Off")