From 096dfd727b8182768e573855f9a951cc3891b141 Mon Sep 17 00:00:00 2001 From: Julien Balet Date: Sat, 9 May 2026 23:27:46 +0200 Subject: [PATCH] import escada ok --- scripts/push_to_escada.py | 467 ++++++++++ scripts/run_imports.py | 152 ++++ scripts/sync_esacada.py | 1782 +++++++++++++++++++++++++++++++++++++ src/importer_notes.py | 125 +++ src/logger.py | 27 + 5 files changed, 2553 insertions(+) create mode 100644 scripts/push_to_escada.py create mode 100755 scripts/run_imports.py create mode 100644 scripts/sync_esacada.py create mode 100644 src/importer_notes.py create mode 100644 src/logger.py diff --git a/scripts/push_to_escada.py b/scripts/push_to_escada.py new file mode 100644 index 0000000..190e143 --- /dev/null +++ b/scripts/push_to_escada.py @@ -0,0 +1,467 @@ +"""Pousse vers Escada les changements de statut effectués dans l'app. + +Usage : + python scripts/push_to_escada.py # tous les changements en attente + python scripts/push_to_escada.py --test # test limité à Poidevin Alexandre / EM-AU 1 + python scripts/push_to_escada.py --count # affiche le nombre de changements en attente + python scripts/push_to_escada.py --no-pull # ne pas récupérer le serveur avant push +""" +from __future__ import annotations + +import json +import subprocess +import sys +from datetime import date +from pathlib import Path + +_root = Path(__file__).resolve().parent.parent +if str(_root) not in sys.path: + sys.path.insert(0, str(_root)) + +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") +if hasattr(sys.stderr, "reconfigure"): + sys.stderr.reconfigure(encoding="utf-8", errors="replace") + +from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout + +from src.db import Absence, Apprenti, EscadaPending, get_engine, init_db, upsert_escada_pending +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy import select + +# Réutilise les utilitaires de navigation depuis sync_esacada +from scripts.sync_esacada import ( + BASE_URL, CLASSES_URL, PROFILE_DIR, + _log, _ensure_logged_in, _launch_context, + _go_to_absence_page, _cache_load, +) + +# ── Coordonnées du serveur ──────────────────────────────────────────────────── +_SSH_HOST = "julbal@20.199.136.37" +_SSH_REMOTE = "/opt/absences" + + +# ── Interaction avec la page d'absences ─────────────────────────────────────── + +_JS_SET_DROPDOWN = """([nom, prenom, idx, val]) => { + for (const tr of document.querySelectorAll('tr')) { + // Vérifier que nom et prénom apparaissent dans des directs courts + // (les lignes-containers du grid DevExpress ont des très longs + // contenant tous les élèves — on les exclut via la limite 200 chars) + const directTds = Array.from(tr.querySelectorAll(':scope > td')); + if (!directTds.length) continue; + const hasNom = directTds.some(td => { + const t = (td.innerText || td.textContent || '').trim(); + return t.includes(nom) && t.length < 200; + }); + const hasPrenom = directTds.some(td => { + const t = (td.innerText || td.textContent || '').trim(); + return t.includes(prenom) && t.length < 200; + }); + if (!hasNom || !hasPrenom) continue; + + const sels = Array.from(tr.querySelectorAll('select')); + // Exclure les containers (trop de selects = plusieurs élèves fusionnés) + if (sels.length > 25) continue; + if (sels.length <= idx) { + return {ok: false, reason: 'seulement ' + sels.length + ' selects, besoin de ' + (idx+1)}; + } + const prev = sels[idx].value; + sels[idx].value = val; + if (sels[idx].value !== val) { + const opts = Array.from(sels[idx].options).map(o => '"' + o.value + '"="' + o.text.trim() + '"').join(', '); + return {ok: false, reason: 'valeur "' + val + '" absente — options: {' + opts + '}'}; + } + sels[idx].dispatchEvent(new Event('change', {bubbles: true})); + return {ok: true, prev: prev}; + } + return {ok: false, reason: 'ligne introuvable pour ' + nom + ' ' + prenom}; +}""" + + +_JS_SET_DATE = """([dateStr]) => { + // Cherche le DevExpress DateEdit : id se termine par _I + const candidates = [ + document.querySelector("input[id*='kalender_I']"), + document.querySelector("input[id*='DateEdit_I']"), + document.querySelector("input[id*='_Date_I']"), + (() => { + const all = document.querySelectorAll("table input[type='text']"); + return all.length ? all[0] : null; + })(), + ]; + const inp = candidates.find(Boolean); + if (!inp) return {ok: false, reason: 'input introuvable'}; + + inp.value = dateStr; + + // Déclenche l'événement ASPx (postback DevExpress) + const ctrlName = inp.id.endsWith('_I') ? inp.id.slice(0, -2) : inp.id; + try { + if (typeof ASPx !== 'undefined' && ASPx.ETextChanged) { + ASPx.ETextChanged(ctrlName); + return {ok: true, method: 'ASPx', id: inp.id}; + } + } catch(e) {} + + // Fallback : événements DOM standards + inp.dispatchEvent(new Event('change', {bubbles: true})); + inp.dispatchEvent(new Event('input', {bubbles: true})); + return {ok: true, method: 'DOM', id: inp.id}; +}""" + + +def _set_date(page, target_date: date) -> bool: + """Change la date dans le sélecteur DevExpress et attend le rechargement.""" + date_str = target_date.strftime("%d/%m/%Y") + try: + result = page.evaluate(_JS_SET_DATE, [date_str]) + if not result.get("ok"): + _log(f" [set_date] ERR : {result.get('reason', '?')}") + return False + + _log(f" [set_date] date={date_str} via {result.get('method')} id={result.get('id')}") + + page.wait_for_timeout(400) + try: + page.wait_for_load_state("networkidle", timeout=20_000) + except Exception: + pass + page.wait_for_timeout(600) + + # Vérification : comparer la valeur affichée dans l'input (pas innerText) + cur_val = page.evaluate( + "() => { const i = document.getElementById('ContentPlaceHolder_site_kalender_I'); return i ? i.value : ''; }" + ) + if cur_val == date_str: + _log(f" [set_date] OK (input value = {cur_val})") + return True + + # Fallback : interaction directe Playwright + _log(f" [set_date] input value='{cur_val}' ≠ '{date_str}', tentative fill+Tab…") + try: + inp2 = page.locator("input[id*='kalender_I']").first + if not inp2.count(): + inp2 = page.locator("table input[type='text']").first + if inp2.count(): + inp2.click(click_count=3) + inp2.fill(date_str) + page.keyboard.press("Tab") + page.wait_for_timeout(400) + try: + page.wait_for_load_state("networkidle", timeout=20_000) + except Exception: + pass + page.wait_for_timeout(600) + cur_val2 = page.evaluate( + "() => { const i = document.getElementById('ContentPlaceHolder_site_kalender_I'); return i ? i.value : ''; }" + ) + if cur_val2 == date_str: + _log(f" [set_date] OK via fill+Tab (input value = {cur_val2})") + return True + _log(f" [set_date] ERR : input value='{cur_val2}' après fill+Tab") + except Exception as e2: + _log(f" [set_date] fill+Tab ERR : {e2}") + + return False + except Exception as e: + _log(f" [set_date] ERR : {e}") + return False + + +_ESCADA_CODES = {"E": "68", "N": "56", "clear": "0"} + + +def _set_dropdown(page, nom: str, prenom: str, periode: int, action: str) -> bool: + """Positionne le dropdown d'une période pour un apprenti. + + Colonnes selects dans la ligne : Remarques(0) | Journée entière(1) | Excuser(2) | P1(3)…P10(12) + → target_idx = periode + 2 + """ + val = _ESCADA_CODES.get(action, "0") + target_idx = periode + 1 + + try: + result = page.evaluate(_JS_SET_DROPDOWN, [nom, prenom, target_idx, val]) + if result.get("ok"): + prev = result.get("prev", "?") + _log(f" SET {nom} {prenom} P{periode} : '{prev}' → '{val}'") + return True + else: + _log(f" WARN {nom} {prenom} P{periode} : {result.get('reason', '?')}") + return False + except Exception as e: + _log(f" ERR {nom} {prenom} P{periode} : {e}") + return False + + +def _save(page) -> bool: + """Clique sur le bouton Enregistrer via JS (bypass visibilité DevExpress).""" + try: + clicked = page.evaluate("""() => { + // Cherche tous les inputs Enregistrer (visibles ou non) + const btns = Array.from(document.querySelectorAll( + 'input[value="Enregistrer"], input[value*="nregistrer"]' + )); + if (!btns.length) return {ok: false, reason: 'bouton introuvable'}; + // Préférer le visible, sinon prendre le premier + const btn = btns.find(b => b.offsetParent !== null) || btns[0]; + btn.click(); + return {ok: true, id: btn.id}; + }""") + + if not clicked.get("ok"): + _log(f" [save] ERR : {clicked.get('reason')}") + return False + + _log(f" [save] cliqué id={clicked.get('id')}") + page.wait_for_timeout(400) + try: + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + pass + page.wait_for_timeout(500) + return True + except Exception as e: + _log(f" [save] ERR : {e}") + return False + + +# ── Synchronisation avec le serveur ────────────────────────────────────────── + +def _pull_from_server(session: Session) -> dict[tuple, int]: + """SSH → serveur, exporte EscadaPending en JSON, upsert en local. + + Retourne un mapping (nom, prenom, classe, date_iso, periode) → server_id + pour permettre le nettoyage côté serveur après push réussi. + """ + _log("PULL Récupération des modifications en attente depuis le serveur…") + cmd = ( + f'ssh {_SSH_HOST} ' + f'"cd {_SSH_REMOTE} && .venv/bin/python scripts/export_pending.py"' + ) + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=30, shell=True + ) + if result.returncode != 0: + _log(f" WARN SSH export_pending échoué : {result.stderr.strip()}") + return {} + raw = result.stdout.strip() + if not raw: + _log(" INFO Aucune modification en attente sur le serveur.") + return {} + entries = json.loads(raw) + except Exception as e: + _log(f" WARN Impossible de récupérer depuis le serveur : {e}") + return {} + + if not entries: + _log(" INFO Aucune modification en attente sur le serveur.") + return {} + + _log(f" {len(entries)} entrée(s) récupérée(s) du serveur") + + server_id_map: dict[tuple, int] = {} + for entry in entries: + ap = session.execute( + select(Apprenti).where( + Apprenti.nom == entry["nom"], + Apprenti.prenom == entry["prenom"], + Apprenti.classe == entry["classe"], + ) + ).scalar_one_or_none() + if ap is None: + _log( + f" WARN apprenti introuvable localement : " + f"{entry['nom']} {entry['prenom']} / {entry['classe']}" + ) + continue + + d = date.fromisoformat(entry["date"]) + upsert_escada_pending(session, ap.id, d, entry["periode"], entry["action"]) + + key = (entry["nom"], entry["prenom"], entry["classe"], + entry["date"], entry["periode"]) + server_id_map[key] = entry["id"] + + session.commit() + _log(f" {len(server_id_map)} entrée(s) fusionnée(s) dans la DB locale") + return server_id_map + + +def _clear_server_pending(server_ids: list[int]) -> None: + """SSH → serveur pour supprimer les EscadaPending par IDs.""" + if not server_ids: + return + ids_str = " ".join(str(i) for i in server_ids) + cmd = ( + f'ssh {_SSH_HOST} ' + f'"cd {_SSH_REMOTE} && .venv/bin/python scripts/clear_pending.py {ids_str}"' + ) + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=30, shell=True + ) + if result.returncode != 0: + _log(f" WARN SSH clear_pending échoué : {result.stderr.strip()}") + else: + _log(f" OK serveur nettoyé ({result.stdout.strip()})") + except Exception as e: + _log(f" WARN Impossible de nettoyer le serveur : {e}") + + +# ── Commande principale ─────────────────────────────────────────────────────── + +def cmd_count(session: Session) -> None: + """Affiche le nombre de changements en attente.""" + n = session.execute(select(EscadaPending)).scalars().all() + _log(f"PENDING_COUNT {len(n)}") + for ep in n: + ap = ep.apprenti + _log(f" {ap.classe} | {ap.nom} {ap.prenom} | {ep.date} P{ep.periode} → {ep.action}") + + +def cmd_push(session: Session, test_mode: bool = False, no_pull: bool = False, debug: bool = False) -> None: + """Pousse tous les changements en attente vers Escada. + + 1. Pull depuis le serveur (sauf --no-pull). + 2. Lecture des EscadaPending locaux. + 3. Navigation Playwright + mise à jour des dropdowns. + 4. Nettoyage côté serveur pour les entrées syncées avec succès. + """ + # ── 1. Pull depuis le serveur ───────────────────────────────────────────── + server_id_map: dict[tuple, int] = {} + if not no_pull: + server_id_map = _pull_from_server(session) + else: + _log("INFO --no-pull : synchronisation serveur ignorée") + + # ── 2. Lecture des EscadaPending locaux ─────────────────────────────────── + q = select(EscadaPending).join(Apprenti, EscadaPending.apprenti_id == Apprenti.id) + if test_mode: + _log("INFO Mode test : Poidevin Alexandre / EM-AU 1 uniquement") + q = q.where(Apprenti.nom == "Poidevin", Apprenti.prenom == "Alexandre") + + pending = session.execute(q).scalars().all() + + if not pending: + _log("INFO Aucun changement en attente.") + return + + # Grouper par (classe, date) + groups: dict[tuple, list] = {} + for ep in pending: + key = (ep.apprenti.classe, ep.date) + groups.setdefault(key, []).append(ep) + + _log(f"TOTAL {len(groups)} groupe(s) à synchroniser ({len(pending)} changement(s))") + + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + results = {"ok": [], "err": []} + # EscadaPending IDs locaux syncés avec succès → pour retrouver les server_ids + synced_eps: list[EscadaPending] = [] + + for i, ((classe, target_date), entries) in enumerate(sorted(groups.items()), 1): + _log(f"PROGRESS {i}/{len(groups)} {classe} {target_date}") + + abs_page = _go_to_absence_page(page, classe) + if abs_page is None: + _log(f"ERR {classe} : page absences introuvable") + for ep in entries: + results["err"].append(f"{classe} {target_date} : navigation échouée") + continue + + if not _set_date(abs_page, target_date): + _log(f"ERR {classe} {target_date} : changement de date échoué") + for ep in entries: + results["err"].append(f"{classe} {target_date} : date non changée") + continue + + synced_ids = [] + synced_ep_objs = [] + for ep in entries: + ap = ep.apprenti + ok = _set_dropdown(abs_page, ap.nom, ap.prenom, ep.periode, ep.action) + if ok: + synced_ids.append(ep.id) + synced_ep_objs.append(ep) + else: + results["err"].append( + f"{classe} {target_date} {ap.nom} {ap.prenom} P{ep.periode}" + ) + + if not synced_ids: + _log(f" SKIP {classe} {target_date} : aucun dropdown modifié") + continue + + if _save(abs_page): + for ep in synced_ep_objs: + obj = session.get(EscadaPending, ep.id) + if obj: + session.delete(obj) + # Marquer l'absence locale comme publiée sur Escada + ab = session.execute( + select(Absence).where( + Absence.apprenti_id == ep.apprenti_id, + Absence.date == ep.date, + Absence.periode == ep.periode, + ) + ).scalar_one_or_none() + if ab: + ab.statut = "publiee_escada" + session.commit() + _log(f"OK {classe} {target_date} : {len(synced_ids)} changement(s) sauvegardé(s)") + results["ok"].extend(synced_ids) + synced_eps.extend(synced_ep_objs) + else: + _log(f"ERR {classe} {target_date} : sauvegarde échouée") + results["err"].append(f"{classe} {target_date} : enregistrement échoué") + + _log(f"PUSH_DONE {json.dumps({'ok': len(results['ok']), 'err': results['err']}, ensure_ascii=False)}") + + # ── 4. Nettoyage côté serveur ───────────────────────────────────────── + if server_id_map and synced_eps: + server_ids_to_clear: list[int] = [] + for ep in synced_eps: + ap = ep.apprenti + key = (ap.nom, ap.prenom, ap.classe, ep.date.isoformat(), ep.periode) + srv_id = server_id_map.get(key) + if srv_id is not None: + server_ids_to_clear.append(srv_id) + if server_ids_to_clear: + _clear_server_pending(server_ids_to_clear) + + finally: + ctx.close() + pw.stop() + + +# ── Point d'entrée ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import argparse + + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--test", action="store_true", help="Limite au test Poidevin Alexandre") + ap.add_argument("--count", action="store_true", help="Affiche les changements en attente") + ap.add_argument("--no-pull", action="store_true", help="Ne pas récupérer les données du serveur avant push") + ap.add_argument("--pull-only", action="store_true", help="Récupère depuis le serveur sans pousser vers Escada") + ap.add_argument("--debug", action="store_true", help="Pause interactive après ouverture de la page absences") + args = ap.parse_args() + + engine = init_db() + Session_ = sessionmaker(bind=engine) + with Session_() as sess: + if args.count: + cmd_count(sess) + elif args.pull_only: + _pull_from_server(sess) + else: + cmd_push(sess, test_mode=args.test, no_pull=args.no_pull, debug=args.debug) diff --git a/scripts/run_imports.py b/scripts/run_imports.py new file mode 100755 index 0000000..00c3e7d --- /dev/null +++ b/scripts/run_imports.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +Lit sync_all_done.json, lance les imports PDF → DB, écrit sync_last_result.json. +Lancé comme subprocess indépendant par reset_sync (start_new_session=True). +Args: +""" +import json, sys +from pathlib import Path +from datetime import datetime + +_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(_ROOT)) + +DATA_DIR = Path(sys.argv[1]) if len(sys.argv) > 1 else _ROOT / "data" +USERNAME = sys.argv[2] if len(sys.argv) > 2 else "escada" +FORCE_ABS = len(sys.argv) > 3 and sys.argv[3] == "1" +ALL_DONE = DATA_DIR / "sync_all_done.json" +RESULT = DATA_DIR / "sync_last_result.json" + +from src.logger import app_log + +app_log("[run_imports] démarré") + +try: + raw = json.loads(ALL_DONE.read_text(encoding="utf-8")) + payload = raw.get("payload", {}) +except Exception as e: + app_log(f"[run_imports] ERREUR lecture all_done: {e}") + sys.exit(1) + +from src.db import get_session +from src.importer import import_pdf as do_import +from src.importer_bn import import_bn as do_import_bn +from src.importer_matu import import_matu as do_import_matu +from src.importer_notes import import_notes_pdf +from src.db import Apprenti, EscadaPending, upsert_apprenti_fiche, _norm_prenom +from sqlalchemy import select + +abs_pdfs = payload.get("abs", []) +bn_pdfs = payload.get("bn", []) +matu_pdfs = payload.get("matu", []) +notes_pdfs = payload.get("notes", []) +fiches = payload.get("fiches", {}) +errors = list(payload.get("errors", [])) + +res_abs = [] +for pdf_path in abs_pdfs: + sess = get_session() + try: + r = do_import(Path(pdf_path), sess, imported_by=USERNAME, force=FORCE_ABS) + detail = f"{r.nb_absences_nouvelles} nouvelles" + if r.nb_absences_mises_a_jour: + detail += f", {r.nb_absences_mises_a_jour} maj" + res_abs.append({"classe": r.classe, "detail": detail}) + app_log(f"[run_imports] abs {r.classe}: {detail}") + except Exception as e: + errors.append(f"Import abs {Path(pdf_path).name}: {e}") + app_log(f"[run_imports] erreur abs: {e}") + finally: + sess.close() + +res_bn = [] +for pdf_path in bn_pdfs: + sess = get_session() + try: + r = do_import_bn(Path(pdf_path), sess, imported_by=USERNAME) + res_bn.append({"classe": r.classe, "nb": str(r.nb_apprentis)}) + app_log(f"[run_imports] BN {r.classe}: {r.nb_apprentis}") + except Exception as e: + errors.append(f"Import BN {Path(pdf_path).name}: {e}") + app_log(f"[run_imports] erreur BN: {e}") + finally: + sess.close() + +res_notes = [] +for pdf_path in notes_pdfs: + sess = get_session() + try: + r = import_notes_pdf(Path(pdf_path), sess) + res_notes.append({"classe": r["classe"], "nb": str(r["nb"])}) + app_log(f"[run_imports] notes {r['classe']}: {r['nb']} apprenti(s)") + except Exception as e: + errors.append(f"Import notes {Path(pdf_path).name}: {e}") + app_log(f"[run_imports] erreur notes: {e}") + finally: + sess.close() + +res_matu = [] +for pdf_path in matu_pdfs: + sess = get_session() + try: + r, unmatched = do_import_matu(Path(pdf_path), sess, imported_by=USERNAME) + res_matu.append({ + "classe": r.classe_mp or Path(pdf_path).stem, + "nb": str(r.nb_apprentis), + "unmatched": ", ".join(unmatched) if unmatched else "", + }) + app_log(f"[run_imports] matu {r.classe_mp}: {r.nb_apprentis}") + except Exception as e: + errors.append(f"Import matu {Path(pdf_path).name}: {e}") + app_log(f"[run_imports] erreur matu: {e}") + finally: + sess.close() + +if fiches: + sess = get_session() + try: + for cls, fiches_list in fiches.items(): + cls_apprentis = sess.execute( + select(Apprenti).where(Apprenti.classe == cls) + ).scalars().all() + for fiche in fiches_list: + nom_eleve = fiche.get("nom_eleve", "") + if not nom_eleve: + continue + nom_norm = _norm_prenom(nom_eleve) + ap = None + for a in cls_apprentis: + full_norm = _norm_prenom(f"{a.nom} {a.prenom}") + if full_norm == nom_norm or full_norm.startswith(nom_norm + " "): + ap = a + break + if ap: + upsert_apprenti_fiche(sess, ap.id, fiche) + sess.commit() + app_log(f"[run_imports] fiches: {sum(len(v) for v in fiches.values())} entrées") + except Exception as e: + errors.append(f"Import fiches: {e}") + sess.rollback() + app_log(f"[run_imports] erreur fiches: {e}") + finally: + sess.close() + +result = { + "timestamp": datetime.now().isoformat(), + "res_abs": res_abs, + "res_bn": res_bn, + "res_notes": res_notes, + "res_matu": res_matu, + "errors": errors, + "op_log": "", +} + +try: + DATA_DIR.mkdir(parents=True, exist_ok=True) + RESULT.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8") + app_log(f"[run_imports] résultats sauvegardés — notes={len(res_notes)} abs={len(res_abs)} erreurs={len(errors)}") +except Exception as e: + app_log(f"[run_imports] ERREUR sauvegarde: {e}") + sys.exit(1) + +app_log("[run_imports] terminé OK") diff --git a/scripts/sync_esacada.py b/scripts/sync_esacada.py new file mode 100644 index 0000000..4050a55 --- /dev/null +++ b/scripts/sync_esacada.py @@ -0,0 +1,1782 @@ +"""Synchronisation des PDFs d'absences depuis Escadaweb. + +Usage: + python scripts/sync_esacada.py --list-classes + python scripts/sync_esacada.py --sync "EM-AU 1" "EM-AU 2" + python scripts/sync_esacada.py --sync # toutes les classes + +Le script ouvre une fenêtre Chromium visible. +Si la session est expirée, connectez-vous manuellement (2FA inclus) ; +le script reprend automatiquement dès que vous êtes sur la page des classes. +""" +from __future__ import annotations + +import json +import re +import sys +import time +from pathlib import Path + +_root = Path(__file__).resolve().parent.parent +if str(_root) not in sys.path: + sys.path.insert(0, str(_root)) + +# Force UTF-8 sur stdout/stderr pour éviter les UnicodeEncodeError sur les pipes Windows +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") +if hasattr(sys.stderr, "reconfigure"): + sys.stderr.reconfigure(encoding="utf-8", errors="replace") + +from playwright.sync_api import Page, sync_playwright, TimeoutError as PWTimeout, Error as PWError + +BASE_URL = "https://escadaweb.vs.ch" +LEHRPERSONEN_URL = f"{BASE_URL}/Lehrpersonen" +CLASSES_URL = f"{BASE_URL}/Lehrpersonen/ViewKlassen.aspx" +EINSTELLUNGEN_URL = f"{BASE_URL}/Lehrpersonen/Dialogs/DlgEinstellungen.aspx" +PROFILE_DIR = _root / "data" / "browser_profile" +PDFS_DIR = _root / "data" / "pdfs" +HREF_CACHE_FILE = _root / "data" / "class_href_cache.json" +CLASSES_CACHE_FILE = _root / "data" / "esacada_classes.json" +LOG_FILE = _root / "data" / "logs" / "operations.log" + +_href_cache: dict[str, str] = {} +_lang_ok = False # True après que la langue a été vérifiée/changée + + +def _cache_load() -> None: + global _href_cache + if HREF_CACHE_FILE.exists(): + try: + _href_cache = json.loads(HREF_CACHE_FILE.read_text(encoding="utf-8")) + except Exception: + _href_cache = {} + + +def _cache_save() -> None: + HREF_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) + HREF_CACHE_FILE.write_text( + json.dumps(_href_cache, ensure_ascii=False, indent=2), encoding="utf-8" + ) + +_HEADER_SKIP = {"Identifiant", "Classe", "Description", "Type", "Nom", "Désignation", "Designation"} + +# JS finder : trouve le href du premier lien ViewAbsenzenErweitert ou ViewLernende +# dans la ligne du tableau qui contient exactement le nom de la classe. +# Argument : [className, preferredPart, fallbackPart] +_JS_FIND_CLASS_HREF = """([className, pref, fallback]) => { + for (const tr of document.querySelectorAll('tr')) { + const tds = Array.from(tr.querySelectorAll(':scope > td')); + if (!tds.some(td => (td.innerText || td.textContent || '').trim() === className)) + continue; + for (const part of [pref, fallback]) { + for (const td of tds) { + for (const a of td.querySelectorAll('a[href]')) { + if (!a.href.includes(part)) continue; + let p = a.parentElement, nested = false; + while (p && p !== td) { + if (p.tagName === 'TABLE') { nested = true; break; } + p = p.parentElement; + } + if (!nested) return a.getAttribute('href'); + } + } + } + } + return null; +}""" + + +# ── Utilitaires ─────────────────────────────────────────────────────────────── + +def _log(msg: str) -> None: + from datetime import datetime + from zoneinfo import ZoneInfo + ts = datetime.now(tz=ZoneInfo("Europe/Zurich")).strftime("%H:%M:%S") + line = f"[{ts}] {msg}" + print(line, flush=True) + try: + LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + with LOG_FILE.open("a", encoding="utf-8") as _f: + _f.write(line + "\n") + except Exception: + pass + + +def _load_settings() -> dict: + _path = _root / "data" / "settings.json" + if not _path.exists(): + return {} + try: + return json.loads(_path.read_text(encoding="utf-8")) + except Exception: + return {} + + +def _load_totp_secret() -> str | None: + return _load_settings().get("totp_secret") or None + + +def _load_escada_creds() -> tuple[str, str]: + """Retourne (username, password) depuis settings, ou ('', '').""" + s = _load_settings() + return s.get("escada_username", ""), s.get("escada_password", "") + + +def _try_fill_login(page: Page) -> bool: + """Détecte le formulaire Keycloak (edusso.apps.vs.ch) et remplit les identifiants. + + Sélecteurs exacts du formulaire : + + + + Retourne True si le formulaire a été trouvé et soumis. + """ + username, password = _load_escada_creds() + if not username or not password: + return False + try: + # Attendre que le champ username soit visible (jusqu'à 5 s pour le rendu JS) + page.wait_for_selector("input#username", state="visible", timeout=5_000) + page.wait_for_selector("input#password", state="visible", timeout=2_000) + _log(" [LOGIN] Formulaire Keycloak détecté — saisie automatique des identifiants.") + page.locator("input#username").fill(username) + page.locator("input#password").fill(password) + try: + page.locator("input#kc-login").click(timeout=2_000) + except Exception: + page.locator("input#password").press("Enter") + return True + except Exception: + return False + + +def _try_fill_totp(page: Page, secret: str) -> bool: + """Remplit le champ OTP Keycloak via JavaScript (bypass visibilité Playwright).""" + import pyotp + _log(f" [2FA] Tentative remplissage OTP sur: {page.url[:80]}") + try: + code = pyotp.TOTP(secret).now() + result = page.evaluate("""(code) => { + const inp = document.querySelector('#otp') + || document.querySelector('[name="otp"]') + || document.querySelector('[autocomplete="one-time-code"]') + || document.querySelector('input[type="text"]:not([type="hidden"])'); + if (!inp) return 'not_found'; + inp.value = code; + inp.dispatchEvent(new Event('input', {bubbles: true})); + inp.dispatchEvent(new Event('change', {bubbles: true})); + return 'filled'; + }""", code) + _log(f" [2FA] JS fill result: {result}") + if result != "filled": + return False + _log(f" [2FA] OTP saisi via JS — soumission du formulaire.") + # Soumettre : clic sur le bouton submit, sinon form.submit() + submitted = page.evaluate("""() => { + const btn = document.querySelector('input[type="submit"]') + || document.querySelector('button[type="submit"]'); + if (btn) { btn.click(); return 'clicked'; } + const form = document.querySelector('form'); + if (form) { form.submit(); return 'submitted'; } + return 'no_submit'; + }""") + _log(f" [2FA] submit result: {submitted}") + return submitted in ("clicked", "submitted") + except Exception as _e: + _log(f" [2FA] JS fill err: {_e}") + return False + + +def _ensure_french_language(page: Page) -> None: + """Force le français via navigation directe vers DlgEinstellungen. + + URL confirmée par capture HAR : /Lehrpersonen/Dialogs/DlgEinstellungen.aspx + Après Speichern, le script de la page redirige automatiquement vers ViewKlassen.aspx. + Bloquant : sys.exit(1) si le changement échoue — tout le parsing dépend du français. + """ + global _lang_ok + if _lang_ok: + return + try: + _log(" [LANG] Navigation vers DlgEinstellungen…") + page.goto(EINSTELLUNGEN_URL, wait_until="domcontentloaded", timeout=15_000) + try: + page.wait_for_load_state("networkidle", timeout=8_000) + except Exception: + pass + + inp_loc = page.locator("#ContentPlaceHolderSite_DropDownList_sprache_I") + try: + inp_loc.wait_for(state="visible", timeout=8_000) + except Exception: + _log("ERR [LANG] Dropdown langue introuvable dans DlgEinstellungen — arrêt.") + sys.exit(1) + + cur_val = inp_loc.input_value() + _log(f" [LANG] Valeur actuelle: {cur_val!r}") + + if cur_val != "français": + _log(" [LANG] Changement en français") + page.evaluate("""() => { + const inp = document.querySelector('#ContentPlaceHolderSite_DropDownList_sprache_I'); + if (inp) { + inp.value = 'français'; + ASPx.ETextChanged('ContentPlaceHolderSite_DropDownList_sprache'); + } + }""") + page.locator("span.dx-vam:has-text('Speichern')").first.click() + try: + page.wait_for_load_state("networkidle", timeout=10_000) + except Exception: + pass + # Attendre que le grid soit prêt avant de rendre la main. + # Sans ça le premier _go_to_students_page tombe sur le timeout 20s. + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=30_000 + ) + except Exception: + pass + _log(" [LANG] Langue changée en français — grid prêt") + else: + _log(" [LANG] Déjà en français") + page.goto(CLASSES_URL, wait_until="domcontentloaded", timeout=15_000) + + _lang_ok = True + except (SystemExit, KeyboardInterrupt): + raise + except Exception as _e: + _log(f"ERR [LANG] Echec inattendu: {_e} — arrêt.") + sys.exit(1) + + +def _ensure_logged_in(page: Page) -> None: + """Gère la reconnexion : login + TOTP automatiques si identifiants configurés.""" + if "ViewKlassen" in page.url: + _ensure_french_language(page) + return + + _totp_secret = _load_totp_secret() + _username, _password = _load_escada_creds() + _log("SESSION_EXPIRED") + + cur = page.url.lower() + if "login" not in cur and "logon" not in cur and "viewklassen" not in cur: + page.goto(LEHRPERSONEN_URL) + + if _username and _password: + _log(" [LOGIN] Identifiants configurés — connexion automatique en cours.") + else: + _log(" Connectez-vous avec votre identifiant et mot de passe dans la fenetre.") + if _totp_secret: + _log(" [2FA] Secret TOTP configure - code saisi automatiquement quand demande.") + + deadline = time.time() + 300 # 5 min + _last_login = 0.0 + _last_totp = 0.0 + + while time.time() < deadline: + try: + _log(f" [LOGIN] url: {page.url[:100]}") + if "ViewKlassen" in page.url: + _log("LOGIN_OK") + _ensure_french_language(page) + return + + # Tentative login automatique (formulaire Keycloak) toutes les 5 s + if _username and _password and (time.time() - _last_login) > 5: + if _try_fill_login(page): + _last_login = time.time() + # Attendre la redirection (vers TOTP ou ViewKlassen) + try: + page.wait_for_load_state("networkidle", timeout=8_000) + except (PWTimeout, PWError): + pass + + if _totp_secret and (time.time() - _last_totp) > 5: + if _try_fill_totp(page, _totp_secret): + _last_totp = time.time() + try: + page.wait_for_url("**ViewKlassen**", timeout=10_000) + _log("LOGIN_OK") + return + except (PWTimeout, PWError): + pass + + page.wait_for_timeout(800) + except PWError: + if "ViewKlassen" in page.url: + _log("LOGIN_OK") + _ensure_french_language(page) + return + + _log("ERR Delai de connexion depasse (5 min).") + sys.exit(1) + + +def _launch_context(): + PROFILE_DIR.mkdir(parents=True, exist_ok=True) + pw = sync_playwright().start() + ctx = pw.chromium.launch_persistent_context( + str(PROFILE_DIR), + headless=True, + args=[ + "--start-maximized", + "--disable-popup-blocking", + ], + accept_downloads=True, + ) + page = ctx.pages[0] if ctx.pages else ctx.new_page() + return pw, ctx, page + + +# ── Récupération des classes ────────────────────────────────────────────────── + +def _next_page(page: Page, current: int) -> bool: + """Clique sur le lien de pagination DevExpress vers la page suivante. + + DevExpress génère des liens avec le numéro de page + en texte. On itère sur tous ces liens et compare via inner_text(). + """ + next_num = current + 1 + try: + page.wait_for_selector("a.dxp-num", state="attached", timeout=3_000) + except Exception: + _log(f" [pagination] pas de pager sur la page") + return False + links = page.locator("a.dxp-num").all() + for link in links: + try: + if link.inner_text().strip() == str(next_num): + _log(f" [pagination] -> page {next_num}") + link.click() + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=15_000 + ) + page.wait_for_timeout(300) + except Exception: + pass + return True + except Exception: + continue + _log(f" [pagination] aucun lien vers page {next_num}") + return False + + +def _scrape_classes(page: Page) -> list[str]: + """Scrape toutes les pages du tableau (suppose déjà sur ViewKlassen et connecté).""" + classes: list[str] = [] + current_page = 1 + + while True: + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=15_000 + ) + except Exception: + break + rows = page.locator("tr:has(a[href*='ViewAbsenzenErweitert'])").all() + for row in rows: + for cell in row.locator("td").all(): + txt = cell.inner_text().strip() + if txt and 1 < len(txt) <= 20 and txt[0].isalpha() and txt not in _HEADER_SKIP: + classes.append(txt) + break + + if _next_page(page, current_page): + current_page += 1 + else: + break + + return sorted(set(classes)) + + +def _all_classes(page: Page) -> list[str]: + """Navigue vers la liste des classes, gère le login, puis scrape toutes les pages.""" + page.goto(CLASSES_URL) + _ensure_logged_in(page) + return _scrape_classes(page) + + +# ── Navigation et téléchargement ────────────────────────────────────────────── + +def _go_to_class_page(page: Page, class_name: str, cache_type: str = "abs") -> "Page | None": + """Navigation générique vers la page d'une classe. + + Tente d'abord l'URL mise en cache ; si invalide, retombe sur le scraping. + Utilise des locators Playwright (pas page.evaluate) pour rester robuste + même si la session expire entre deux navigations. + """ + cache_key = f"{class_name}:{cache_type}" + + # ── Tentative via cache ─────────────────────────────────────────────────── + cached_url = _href_cache.get(cache_key) + if cached_url: + # Toujours passer par CLASSES_URL avant le cache : réinitialise le contexte + # serveur ASP.NET (nécessaire après un download Notes/BN sur ViewLernende). + try: + page.goto(CLASSES_URL, wait_until="domcontentloaded", timeout=10_000) + except Exception: + pass + try: + page.goto(cached_url, wait_until="domcontentloaded", timeout=15_000) + page.wait_for_load_state("networkidle", timeout=15_000) + cur = page.url + if ("ViewKlassen" not in cur + and "login" not in cur.lower() + and class_name in (page.content() or "")): + _log(f"CACHE {class_name}") + return page + except Exception: + pass + _log(f"CACHE_MISS {class_name} — retour au scraping") + del _href_cache[cache_key] + _cache_save() + + # ── Scraping ────────────────────────────────────────────────────────────── + page.goto(CLASSES_URL) + _ensure_logged_in(page) # gère expiration de session / 2FA + + # Attendre que la grille soit rendue (au moins un lien ViewAbsenzenErweitert visible) + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=20_000 + ) + page.wait_for_timeout(500) + except Exception: + _log(f"WARN {class_name}: grille non chargée après 20s") + return None + + # DevExpress restaure le dernier état du grid (pagination incluse). + # Forcer le retour à la page 1 si un lien "1" est présent dans le pager. + try: + p1 = page.locator("a.dxp-num:has-text('1')").first + if p1.count(): + _log(f" [scan] retour page 1 du grid DevExpress") + p1.click() + page.wait_for_load_state("networkidle", timeout=10_000) + page.wait_for_timeout(300) + except Exception: + pass + + pref, fallback = ( + ("ViewLernende", "ViewAbsenzenErweitert") + if cache_type == "lernende" + else ("ViewAbsenzenErweitert", "ViewLernende") + ) + + current_pg = 1 + while True: + _log(f" [scan page={current_pg}] recherche '{class_name}'…") + + href = None + try: + href = page.evaluate(_JS_FIND_CLASS_HREF, [class_name, pref, fallback]) + except Exception as e: + _log(f" [scan page={current_pg}] evaluate ERR: {e}") + + _log(f" [scan page={current_pg}] -> {'TROUVE' if href else 'pas trouve'}") + + if href: + full_url = ( + href if href.startswith("http") + else f"{BASE_URL}/Lehrpersonen/{href.lstrip('/')}" + ) + _href_cache[cache_key] = full_url + _cache_save() + try: + page.locator(f"a[href='{href}']").first.click() + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + page.goto(full_url, wait_until="domcontentloaded") + try: + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + pass + _log(f" [nav {cache_type}] url après nav: {page.url[:80]}") + return page + + if not _next_page(page, current_pg): + break + current_pg += 1 + + _log(f"WARN {class_name}: classe introuvable") + return None + + +def _go_to_absence_page(page: Page, class_name: str) -> "Page | None": + """Ouvre la page d'absences de la classe (ViewAbsenzenErweitert ou ViewLernende). + + Le cache abs est toujours effacé avant navigation : les GUIDs Escada sont + propres à chaque session, donc un GUID mis en cache depuis une session + précédente provoque une double-navigation corrompant le contexte ASP.NET. + """ + cache_key = f"{class_name}:abs" + if cache_key in _href_cache: + del _href_cache[cache_key] + _cache_save() + result = _go_to_class_page(page, class_name, cache_type="abs") + if result is not None: + _log(f" [abs nav] url: {page.url[:80]}") + return result + + +def _has_bn_button(page: Page) -> bool: + """Renvoie True si le bouton BN (FR ou DE) est présent dans le ribbon.""" + loc = page.locator( + "a.dxr-item.dxr-buttonItem:has-text('Bulletins de notes')," + "a.dxr-item.dxr-buttonItem:has-text('Zeugnisse')" + ) + if loc.count(): + return True + # Log des boutons disponibles si aucun BN trouvé (aide au diagnostic) + try: + _all = page.locator("a.dxr-item.dxr-buttonItem").all() + if _all: + _log(f" [BN] boutons sur ViewLernende: {[b.inner_text() for b in _all[:8]]}") + except Exception: + pass + return False + + +def _go_to_students_page(page: Page, class_name: str) -> "Page | None": + """Ouvre ViewLernende (liste élèves) en naviguant DIRECTEMENT depuis ViewKlassen. + + Règles strictes : + - Toujours repartir de CLASSES_URL (jamais depuis la page absences). + - Ne retourner la page que si le bouton 'Bulletins de notes' est visible. + - Pas de cache : les GUIDs Escada sont contexte-dépendants (chemin 'Classes→Elèves' + donne un GUID valide ; chemin 'Classes→Absences→Elèves' donne un GUID différent + avec une liste vide et sans bouton BN). + """ + # Purger toute entrée de cache lernende (GUIDs ne sont pas fiables entre sessions) + cache_key = f"{class_name}:lernende" + if cache_key in _href_cache: + del _href_cache[cache_key] + _cache_save() + + # ── Naviguer depuis la liste des classes ────────────────────────────────── + page.goto(CLASSES_URL) + _ensure_logged_in(page) + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=20_000 + ) + page.wait_for_timeout(500) + except Exception: + _log(f"WARN {class_name}: grille ViewKlassen non chargée") + return None + try: + p1 = page.locator("a.dxp-num:has-text('1')").first + if p1.count(): + p1.click() + page.wait_for_load_state("networkidle", timeout=10_000) + page.wait_for_timeout(300) + except Exception: + pass + + current_pg = 1 + while True: + _log(f" [lrn p={current_pg}] '{class_name}'") + + # ── Tentative A : lien href*ViewLernende dans la ligne ──────────────── + # Cherche dans les TDs directs ET dans les sous-tables DevExpress. + lrn_href = page.evaluate("""([className]) => { + for (const tr of document.querySelectorAll('tr')) { + const tds = Array.from(tr.querySelectorAll(':scope > td')); + if (!tds.some(td => (td.innerText || td.textContent || '').trim() === className)) + continue; + for (const td of tds) { + for (const a of td.querySelectorAll('a[href]')) { + if (a.href.includes('ViewLernende')) return a.getAttribute('href'); + } + } + } + return null; + }""", [class_name]) + + if lrn_href: + _log(f" [lrn] lien trouvé : {lrn_href[:70]}") + try: + page.locator(f"a[href='{lrn_href}']").first.click() + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + full = (lrn_href if lrn_href.startswith("http") + else f"{BASE_URL}/Lehrpersonen/{lrn_href.lstrip('/')}") + page.goto(full, wait_until="domcontentloaded") + try: + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + pass + if "ViewLernende" in page.url and _has_bn_button(page): + _log(f" [lrn] OK bouton BN présent") + return page + _log(f" [lrn] WARN: ViewLernende atteint mais bouton BN absent — URL: {page.url[:80]}") + # Revenir sur ViewKlassen pour tenter l'approche B + page.goto(CLASSES_URL) + try: + page.wait_for_selector( + "a[href*='ViewAbsenzenErweitert']", state="attached", timeout=15_000 + ) + page.wait_for_timeout(300) + except Exception: + pass + try: + p1 = page.locator("a.dxp-num:has-text('1')").first + if p1.count(): + p1.click() + page.wait_for_load_state("networkidle", timeout=10_000) + page.wait_for_timeout(300) + except Exception: + pass + + # ── Tentative B : clic sur le nom de la classe dans la ligne ───────── + # Le nom de classe peut être un lien DevExpress (onclick) sans href direct. + _log(f" [lrn] tentative B: clic sur '{class_name}' dans la grille") + loc = page.locator( + "tr:has(a[href*='ViewAbsenzenErweitert']) td" + ).filter(has_text=class_name).first + if loc.count(): + try: + loc.click() + page.wait_for_load_state("networkidle", timeout=15_000) + if "ViewLernende" in page.url and _has_bn_button(page): + _log(f" [lrn] OK via clic nom classe") + return page + _log(f" [lrn] clic nom classe -> {page.url[:80]} (bouton BN: {_has_bn_button(page)})") + except Exception as e: + _log(f" [lrn] clic nom ERR: {e}") + + if not _next_page(page, current_pg): + break + current_pg += 1 + + _log(f"WARN {class_name}: ViewLernende avec bouton BN introuvable") + return None + + +# ID DevExpress de la grille ViewLernende (stable sur Escada EPTM) +_GRID_ID = "ContentPlaceHolder_site_GridLernende" + + +def _parse_fiche_text(raw: str) -> dict: + """Parse le texte brut d'une ligne de détail Escada en dict fiche.""" + import re + + # Fusionner les colonnes (séparées par |||) en un seul bloc de lignes + text = raw.replace('|||', '\n') + lines = [l.strip() for l in text.splitlines() if l.strip()] + + fiche: dict = {} + section = '' + reEmail = re.compile(r'^[^@\s]+@[^@\s]+\.[^@\s]+$') + reTel = re.compile(r'^\+?[\d\s\/\-\.]{7,}$') + reDate = re.compile(r'\d{2}\.\d{2}\.\d{4}') + reCp = re.compile(r'^(\d{4})\s+(.+)$') + reTelPfx = re.compile(r'^(Mobile|Tel|Tél)[^\d+]*', re.I) + + for line in lines: + if re.match(r'^El[eè]ve\s*:', line, re.I): section = 'eleve'; continue + if re.match(r'^Entreprise\s*:', line, re.I): section = 'entreprise'; continue + if re.match(r'^Formateur\s*:', line, re.I): section = 'formateur'; continue + if re.match(r'^Remarques?\s*:', line, re.I): section = ''; continue + + if section == 'eleve': + if 'nom_eleve' not in fiche \ + and not reEmail.match(line) and not reTel.match(line) \ + and not reDate.search(line) and not reCp.match(line) \ + and not re.match(r'^(Mobile|Tel|Tél|Majeur)', line, re.I): + fiche['nom_eleve'] = line; continue + if 'adresse' not in fiche \ + and not reEmail.match(line) and not reTel.match(line) \ + and not reDate.search(line) and not reCp.match(line) \ + and not re.match(r'^(Mobile|Tel|Tél|Majeur)', line, re.I): + fiche['adresse'] = line; continue + m = reCp.match(line) + if m and 'code_postal' not in fiche: + fiche['code_postal'] = m.group(1); fiche['localite'] = m.group(2); continue + if re.match(r'^(Mobile|Tel|Tél)', line, re.I) and 'telephone' not in fiche: + fiche['telephone'] = reTelPfx.sub('', line).strip(); continue + if reEmail.match(line) and 'email' not in fiche: + fiche['email'] = line; continue + dm = reDate.search(line) + if dm and 'date_naissance' not in fiche: + fiche['date_naissance'] = dm.group(0) + if re.search(r'Majeur', line, re.I): + fiche['majeur'] = bool(re.search(r'\boui\b', line, re.I)) + + elif section == 'entreprise': + if 'entreprise_nom' not in fiche \ + and not reEmail.match(line) and not reTel.match(line) \ + and not reCp.match(line) \ + and not re.match(r'^(Mobile|Tel|Tél)', line, re.I): + fiche['entreprise_nom'] = line; continue + if 'entreprise_adresse' not in fiche \ + and not reEmail.match(line) and not reTel.match(line) \ + and not reCp.match(line) \ + and not re.match(r'^(Mobile|Tel|Tél)', line, re.I): + fiche['entreprise_adresse'] = line; continue + m = reCp.match(line) + if m and 'entreprise_code_postal' not in fiche: + fiche['entreprise_code_postal'] = m.group(1) + fiche['entreprise_localite'] = m.group(2); continue + if re.match(r'^(Mobile|Tel|Tél)', line, re.I) and 'entreprise_telephone' not in fiche: + fiche['entreprise_telephone'] = reTelPfx.sub('', line).strip(); continue + if reEmail.match(line) and 'entreprise_email' not in fiche: + fiche['entreprise_email'] = line; continue + + elif section == 'formateur': + if 'formateur_nom' not in fiche and not reEmail.match(line): + fiche['formateur_nom'] = line; continue + if reEmail.match(line) and 'formateur_email' not in fiche: + fiche['formateur_email'] = line; continue + + return fiche + + +def _scrape_student_details(page: Page, class_name: str) -> list[dict]: + """Scrape les fiches depuis ViewLernende (grille DevExpress ASPxGridView). + + Structure connue du DOM Escada : + - Lignes données : tr#_DXDataRow{N} + - Bouton expand : img.dxGridView_gvDetailCollapsedButton_MetropolisBlue + avec onclick="ASPx.GVShowDetailRow(...,N,...)" + - Cellule détail : td#_tcdxdt{N} (colspan=15, chargée par AJAX au clic) + └ inner table → tr → td[0] = Élève | td[2] = Entreprise + Formateur + """ + _log(f" [fiches] scraping {class_name}…") + + try: + page.wait_for_load_state("networkidle", timeout=15_000) + except Exception: + pass + + gid = _GRID_ID + + # Compter les lignes de données via leurs IDs DevExpress + n = page.evaluate( + "(gid) => document.querySelectorAll(`[id^='${gid}_DXDataRow']`).length", + gid + ) + _log(f" [fiches] {n} élève(s) trouvé(s)") + if n == 0: + return [] + + # Déplier + lire une ligne à la fois (Escada ne gère pas les AJAX simultanés) + fiches: list[dict] = [] + for i in range(n): + # Clic sur le bouton expand de la ligne i + clicked = page.evaluate("""([gid, i]) => { + const row = document.getElementById(`${gid}_DXDataRow${i}`); + if (!row) return false; + const img = row.querySelector( + 'img.dxGridView_gvDetailCollapsedButton_MetropolisBlue' + ); + if (!img) return false; + img.click(); + return true; + }""", [gid, i]) + + if not clicked: + _log(f" [fiches] {i}: WARNING bouton expand introuvable") + continue + + # Attendre que la cellule de cette ligne soit chargée (max 15s) + ready = False + for _ in range(15): + page.wait_for_timeout(1_000) + ready = page.evaluate("""([gid, i]) => { + const cell = document.getElementById(`${gid}_tcdxdt${i}`); + if (!cell) return false; + return (cell.innerText || cell.textContent || '').trim().length >= 10; + }""", [gid, i]) + if ready: + break + + if not ready: + _log(f" [fiches] {i}: WARNING cellule non chargée après 15s") + continue + + # Lire la cellule + raw = page.evaluate("""([gid, i]) => { + const cell = document.getElementById(`${gid}_tcdxdt${i}`); + if (!cell) return null; + const inner = cell.querySelector('table tr'); + if (!inner) { + return (cell.innerText || '').trim() || null; + } + const tds = inner.querySelectorAll(':scope > td'); + // td[0] = Élève, td[2] = Entreprise + Formateur + const eleve = tds[0] ? (tds[0].innerText || '').trim() : ''; + const ent = tds[2] ? (tds[2].innerText || '').trim() : ''; + if (!eleve && !ent) return null; + return eleve + (ent ? ('\\n||||\\n' + ent) : ''); + }""", [gid, i]) + + if raw: + fiche = _parse_fiche_text(raw) + if fiche.get('nom_eleve') or fiche.get('entreprise_nom'): + fiches.append(fiche) + _log(f" [fiches] {i}: {fiche.get('nom_eleve', '?')}") + else: + _log(f" [fiches] {i}: WARNING données vides — raw[:80]={raw[:80]!r}") + else: + _log(f" [fiches] {i}: WARNING cellule vide") + + _log(f" [fiches] {len(fiches)} fiche(s) extraite(s)") + return fiches + + +def _download_pdf(page: Page, class_name: str) -> Path | None: + """Clique sur 'Contrôle des absences (apprenants)' et récupère le PDF. + + Le clic ouvre le PDF dans un nouvel onglet. On récupère l'URL, on ferme + l'onglet immédiatement (évite le cache du lecteur PDF Chrome), puis on + re-télécharge via context.request.get() — GET HTTP pur avec les cookies + de session, sans passer par le lecteur PDF du navigateur. + """ + PDFS_DIR.mkdir(parents=True, exist_ok=True) + dest = PDFS_DIR / f"esacada_{class_name.replace(' ', '_')}.pdf" + + _log(f" [abs] page url avant clic: {page.url[:80]}") + # Attendre que le ribbon DevExpress soit rendu + try: + page.wait_for_selector("a.dxr-item.dxr-buttonItem", timeout=15_000) + except Exception: + pass + # Cibler le DevExpress ribbon, pas le intérieur (qui n'a pas le handler JS) + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Contrôle des absences (apprenants)')").first + if not btn.count(): + # Fallback allemand (si locale du serveur est DE) + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Absenzenkontrolle (Lernende)')").first + if not btn.count(): + try: + _all = page.locator("a.dxr-item.dxr-buttonItem").all() + _texts = [b.inner_text() for b in _all] + _log(f" [abs] boutons disponibles: {_texts}") + except Exception: + pass + _log(f"ERR {class_name}: bouton introuvable") + return None + + # Stratégie 0 : extraire le href du bouton et faire un GET direct. + # Le bouton utilise un nom de fenêtre nommée qui peut + # être bloqué après un download précédent (Notes). Le href contient l'URL + # complète du rapport — on l'utilise directement sans clic. + try: + href_attr = page.evaluate("el => el.getAttribute('href')", btn.element_handle()) + except Exception: + href_attr = None + + if href_attr: + full_pdf_url = ( + href_attr if href_attr.startswith("http") + else f"{BASE_URL}/{href_attr.lstrip('/')}" + ) + _log(f" [abs] GET direct: {full_pdf_url[:80]}") + try: + resp = page.context.request.get(full_pdf_url, timeout=30_000) + if resp.ok: + body = resp.body() + if len(body) > 1_000: + dest.write_bytes(body) + _log(f"OK {class_name} [href size={len(body)}]") + return dest + _log(f" [abs] GET status={resp.status} len={len(resp.body())}") + except Exception as e: + _log(f" [abs] GET err: {e}") + + pages_before = {id(p) for p in page.context.pages} + + try: + # Stratégie 1 : téléchargement direct (Content-Disposition: attachment) + try: + with page.expect_download(timeout=10_000) as dl_info: + btn.click() + dl_info.value.save_as(dest) + _log(f"OK {class_name} [direct size={dest.stat().st_size}]") + return dest + except PWTimeout: + pass + + # Stratégie 2 : le PDF s'ouvre dans un nouvel onglet + page.wait_for_timeout(2_000) + new_tabs = [p for p in page.context.pages + if p is not page and id(p) not in pages_before] + + pdf_url: str | None = None + for tab in new_tabs: + try: + tab.wait_for_load_state("domcontentloaded", timeout=10_000) + except Exception: + pass + url = tab.url + if url.startswith("http") and ("Reports" in url or ".pdf" in url.lower()): + pdf_url = url + elif "chrome-extension" in url and "http" in url: + m = re.search(r"(https?://\S+)", url) + if m: + pdf_url = m.group(1) + try: + tab.close() + except Exception: + pass + + if pdf_url: + resp = page.context.request.get(pdf_url) + if resp.ok: + body = resp.body() + if len(body) > 1_000: + dest.write_bytes(body) + _log(f"OK {class_name} [request size={len(body)}]") + return dest + _log(f"ERR {class_name}: GET HTTP {resp.status}") + return None + + _log(f"ERR {class_name}: aucun onglet PDF trouvé") + return None + + except Exception as e: + _log(f"ERR {class_name}: {e}") + return None + + +def _download_bn_pdf(page: Page, class_name: str) -> Path | None: + """Sur la page liste-élèves, clique 'Bulletins de notes' → 'Impression des bulletins' + et télécharge le PDF BN de la classe. + + Utilise les mêmes deux stratégies de téléchargement que _download_pdf(). + """ + PDFS_DIR.mkdir(parents=True, exist_ok=True) + dest = PDFS_DIR / f"bn_{class_name.replace(' ', '_')}.pdf" + + # Attendre que le ribbon DevExpress soit rendu + try: + page.wait_for_selector("a.dxr-item.dxr-buttonItem", timeout=15_000) + except Exception: + pass + # Bouton DevExpress ribbon — pas d'attribut onclick, ciblé par CSS + texte. + # Le bouton est un sans onclick inline. + btn_bn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Bulletins de notes')").first + if not btn_bn.count(): + btn_bn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Zeugnisse')").first + if not btn_bn.count(): + try: + _all = page.locator("a.dxr-item.dxr-buttonItem").all() + _log(f" [BN] boutons disponibles: {[b.inner_text() for b in _all]}") + except Exception: + pass + _log(f"ERR BN {class_name}: bouton 'Bulletins de notes' introuvable") + return None + + btn_bn.click() + + # Le popup DevExpress charge DlgZeugnisse.aspx dans une iframe. + # On attend que cette iframe apparaisse dans page.frames. + dlg_frame = None + for _ in range(20): + for frame in page.frames: + if "DlgZeugnisse" in frame.url: + dlg_frame = frame + break + if dlg_frame: + break + page.wait_for_timeout(1_000) + + if dlg_frame is None: + _log(f"ERR BN {class_name}: iframe DlgZeugnisse introuvable") + return None + + try: + dlg_frame.wait_for_load_state("networkidle", timeout=20_000) + except Exception: + pass + + # Le div wrapper du bouton DevExpress (l'input type=submit interne est hidden) + try: + dlg_frame.wait_for_selector( + "#ContentPlaceHolderSite_Button_Zeugnisdruck", + state="visible", + timeout=15_000, + ) + except PWTimeout: + _log(f"ERR BN {class_name}: bouton Zeugnisdruck non visible dans DlgZeugnisse") + return None + + # Intercepte window.open dans l'iframe pour capturer l'URL du rapport + # avant que la popup soit ouverte — même stratégie que abs/Notes/MATU. + try: + dlg_frame.evaluate(""" + window.__bnOpenedUrl = null; + const _orig = window.open.bind(window); + window.open = function(url, ...rest) { + window.__bnOpenedUrl = (url || ''); + return _orig(url, ...rest); + }; + """) + except Exception as e: + _log(f" [BN] intercept window.open err: {e}") + + # Écoute aussi les downloads sur la page principale (au cas où le PDF + # arrive via l'iframe parent plutôt que via window.open) + _dl_main = [False] + def _on_main_dl(dl): + try: + dl.save_as(str(dest)) + _dl_main[0] = True + _log(f" [BN] download main page capturé: {dl.suggested_filename}") + except Exception as ex: + _log(f" [BN] main dl err: {ex}") + page.on("download", _on_main_dl) + + try: + dlg_frame.click("#ContentPlaceHolderSite_Button_Zeugnisdruck") + except Exception as e: + page.remove_listener("download", _on_main_dl) + _log(f"ERR BN {class_name}: clic Zeugnisdruck: {e}") + return None + + # Polling 120s : window.open URL, download principal, ou nouvelle page + pages_before = {id(p) for p in page.context.pages} + bn_report_url: str | None = None + + for _i in range(120): + page.wait_for_timeout(1_000) + + if _dl_main[0]: + break + + # Vérifier si window.open a été appelé avec une URL + try: + opened = dlg_frame.evaluate("window.__bnOpenedUrl") + except Exception: + opened = None + if opened and opened.startswith("/"): + bn_report_url = f"{BASE_URL}/{opened.lstrip('/')}" + _log(f" [BN] window.open URL: {bn_report_url[:80]}") + break + if opened and opened.startswith("http"): + bn_report_url = opened + _log(f" [BN] window.open URL: {bn_report_url[:80]}") + break + + # Nouvelles pages créées + new_pages = [p for p in page.context.pages if id(p) not in pages_before] + for np in new_pages: + pages_before.add(id(np)) + try: + np.wait_for_load_state("domcontentloaded", timeout=5_000) + except Exception: + pass + np_url = np.url + _log(f" [BN] nouvelle page: {np_url[:80]}") + if np_url.startswith("http"): + bn_report_url = np_url + try: + np.close() + except Exception: + pass + break + + if bn_report_url: + break + + if _i in (0, 4, 9, 29, 59): + _log(f" [BN] +{_i+1}s attente…") + + page.remove_listener("download", _on_main_dl) + + # Cas 1 : download capturé sur la page principale + if _dl_main[0] and dest.exists() and dest.stat().st_size > 1_000: + _log(f"OK BN {class_name} [main download size={dest.stat().st_size}]") + return dest + + # Cas 2 : URL du rapport récupérée → GET direct + if bn_report_url: + try: + resp = page.context.request.get(bn_report_url, timeout=60_000) + if resp.ok: + body = resp.body() + if len(body) > 1_000: + dest.write_bytes(body) + _log(f"OK BN {class_name} [href size={len(body)}]") + return dest + _log(f"ERR BN {class_name}: GET HTTP {resp.status} len={len(resp.body())}") + except Exception as e: + _log(f"ERR BN {class_name}: GET err: {e}") + return None + + _log(f"ERR BN {class_name}: ni download ni URL après 120s") + return None + + +def _download_ribbon_pdf(page: Page, btn_locator, dest: Path, label: str) -> Path | None: + """Télécharge un PDF déclenché par un bouton ribbon DevExpress. + + Stratégie 0 : href direct sur le (contourne les problèmes target=Blank). + Stratégie 1 : polling 90s — capture Content-Disposition: attachment ET nouveaux onglets. + """ + # Stratégie 0 : href direct (indépendant de l'état de la session) + try: + href_attr = page.evaluate("el => el.getAttribute('href')", btn_locator.element_handle()) + except Exception: + href_attr = None + + if href_attr and href_attr.startswith("/"): + full_url = f"{BASE_URL}/{href_attr.lstrip('/')}" + _log(f" [{label}] href direct: {full_url[:80]}") + try: + resp = page.context.request.get(full_url, timeout=30_000) + if resp.ok: + body = resp.body() + if len(body) > 1_000: + dest.write_bytes(body) + _log(f"OK {label} [href size={len(body)}]") + return dest + _log(f" [{label}] href GET status={resp.status if resp else '?'}") + except Exception as e: + _log(f" [{label}] href GET err: {e}") + + try: + btn_locator.scroll_into_view_if_needed(timeout=3_000) + page.wait_for_timeout(300) + except Exception: + pass + + # Écouteur de téléchargement direct (Content-Disposition: attachment) sur la page courante + _dl_ok = [False] + def _on_download(dl): + try: + dl.save_as(str(dest)) + _dl_ok[0] = True + _log(f" [{label}] download direct capturé") + except Exception as e: + _log(f" [{label}] download save err: {e}") + page.on("download", _on_download) + + pages_before = {id(p) for p in page.context.pages} + + try: + btn_locator.click() + except Exception as e: + page.remove_listener("download", _on_download) + _log(f"ERR {label}: click failed: {e}") + return None + + # Polling 90s — vérifie download direct ET nouveaux onglets + for _i in range(45): + page.wait_for_timeout(2_000) + + # Download direct capturé par l'écouteur + if _dl_ok[0] and dest.exists() and dest.stat().st_size > 1_000: + page.remove_listener("download", _on_download) + _log(f"OK {label} [direct size={dest.stat().st_size}]") + return dest + + # Nouvel onglet ouvert via window.open + new_tabs = [p for p in page.context.pages if id(p) not in pages_before] + for tab in new_tabs: + pages_before.add(id(tab)) + try: + tab.wait_for_load_state("domcontentloaded", timeout=10_000) + except Exception: + pass + url = tab.url + try: + tab.close() + except Exception: + pass + if url.startswith("http"): + try: + resp = page.context.request.get(url, timeout=30_000) + if resp.ok and len(resp.body()) > 1_000: + page.remove_listener("download", _on_download) + dest.write_bytes(resp.body()) + _log(f"OK {label} [new_page size={len(resp.body())}]") + return dest + except Exception: + pass + + # Diagnostic : état de la page 2s et 10s après le clic + if _i in (0, 4): + try: + cur = page.url + extra_frames = [f.url for f in page.frames + if f.url and "about:blank" not in f.url and f.url != cur] + _log(f" [{label}] +{(_i+1)*2}s url={cur[:70]}" + + (f" frames={extra_frames[:2]}" if extra_frames else "")) + except Exception: + pass + + page.remove_listener("download", _on_download) + _log(f"ERR {label}: aucun PDF récupéré après 90s") + return None + + +def _download_notes_pdf(page: Page, class_name: str) -> Path | None: + """Sur la page liste-élèves, clique 'Moyennes des notes d'examen (apprenant)' + et télécharge le PDF de la liste des notes par apprenant. + """ + PDFS_DIR.mkdir(parents=True, exist_ok=True) + dest = PDFS_DIR / f"notes_{class_name.replace(' ', '_')}.pdf" + + try: + page.wait_for_selector("a.dxr-item.dxr-buttonItem", timeout=15_000) + except Exception: + pass + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Moyennes des notes')").first + if not btn.count(): + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('Prüfungsnotenliste (Lernende)')").first + if not btn.count(): + try: + _all = page.locator("a.dxr-item.dxr-buttonItem").all() + _log(f" [NOTES] boutons disponibles: {[b.inner_text() for b in _all]}") + except Exception: + pass + _log(f"ERR NOTES {class_name}: bouton 'Moyennes des notes' introuvable") + return None + return _download_ribbon_pdf(page, btn, dest, f"NOTES {class_name}") + + +def _download_matu_pdf(page: Page, class_name: str) -> Path | None: + """Sur la page liste-élèves d'une classe MP, télécharge la liste de contrôle des notes MP.""" + PDFS_DIR.mkdir(parents=True, exist_ok=True) + dest = PDFS_DIR / f"matu_{class_name.replace(' ', '_')}.pdf" + + try: + page.wait_for_selector("a.dxr-item.dxr-buttonItem", timeout=15_000) + except Exception: + pass + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('notes MP du bulletin')").first + if not btn.count(): + btn = page.locator("a.dxr-item.dxr-buttonItem:has-text('BM-Zeugnisnoten-Kontrollliste')").first + if not btn.count(): + try: + _all = page.locator("a.dxr-item.dxr-buttonItem").all() + _log(f" [MATU] boutons disponibles: {[b.inner_text() for b in _all]}") + except Exception: + pass + _log(f"ERR MATU {class_name}: bouton 'notes MP du bulletin' introuvable") + return None + return _download_ribbon_pdf(page, btn, dest, f"MATU {class_name}") + + +# ── Commandes principales ───────────────────────────────────────────────────── + +def cmd_list_classes() -> None: + pw, ctx, page = _launch_context() + try: + classes = _all_classes(page) + _log(f"CLASSES_JSON:{json.dumps(classes, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +def cmd_sync(selected: list[str]) -> None: + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes...") + selected = _scrape_classes(page) + + _log(f"TOTAL {len(selected)}") + downloaded: list[str] = [] + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + abs_page = _go_to_absence_page(page, cls) + if abs_page is not None: + pdf = _download_pdf(abs_page, cls) + if pdf: + downloaded.append(str(pdf)) + if abs_page is not page: + try: + abs_page.close() + except Exception: + pass + + _log(f"DONE {json.dumps(downloaded, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +def cmd_sync_bn(selected: list[str]) -> None: + """Télécharge les PDFs des Bulletins de Notes pour les classes sélectionnées.""" + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes...") + selected = _scrape_classes(page) + + _log(f"TOTAL {len(selected)}") + downloaded: list[str] = [] + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + students_page = _go_to_students_page(page, cls) + if students_page is not None: + pdf = _download_bn_pdf(students_page, cls) + if pdf: + downloaded.append(str(pdf)) + if students_page is not page: + try: + students_page.close() + except Exception: + pass + + _log(f"BN_DONE {json.dumps(downloaded, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +def cmd_sync_fiches(selected: list[str]) -> None: + """Scrape les fiches détaillées des élèves depuis ViewLernende.""" + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes...") + selected = _scrape_classes(page) + + _log(f"TOTAL {len(selected)}") + all_fiches: dict[str, list[dict]] = {} + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + students_page = _go_to_students_page(page, cls) + if students_page is not None: + fiches = _scrape_student_details(students_page, cls) + all_fiches[cls] = fiches + if students_page is not page: + try: + students_page.close() + except Exception: + pass + + _log(f"FICHES_DONE {json.dumps(all_fiches, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +def cmd_sync_notes(selected: list[str]) -> None: + """Télécharge les PDFs des moyennes de notes (apprenant) pour les classes sélectionnées.""" + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes...") + selected = _scrape_classes(page) + + _log(f"TOTAL {len(selected)}") + downloaded: list[str] = [] + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + students_page = _go_to_students_page(page, cls) + if students_page is not None: + pdf = _download_notes_pdf(students_page, cls) + if pdf: + downloaded.append(str(pdf)) + if students_page is not page: + try: + students_page.close() + except Exception: + pass + + _log(f"NOTES_DONE {json.dumps(downloaded, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +def cmd_sync_matu(selected: list[str]) -> None: + """Télécharge les listes de contrôle des notes MP pour les classes sélectionnées.""" + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes MP...") + all_cls = _scrape_classes(page) + selected = [c for c in all_cls if re.match(r"MP\d", c, re.I)] + + _log(f"TOTAL {len(selected)}") + downloaded: list[str] = [] + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + students_page = _go_to_students_page(page, cls) + if students_page is not None: + pdf = _download_matu_pdf(students_page, cls) + if pdf: + downloaded.append(str(pdf)) + if students_page is not page: + try: + students_page.close() + except Exception: + pass + + _log(f"MATU_DONE {json.dumps(downloaded, ensure_ascii=False)}") + finally: + ctx.close() + pw.stop() + + +# ── Debug ──────────────────────────────────────────────────────────────────── + +def cmd_debug_bn(class_name: str) -> None: + """Ouvre la page liste-élèves et affiche tous les éléments cliquables du ribbon.""" + pw, ctx, page = _launch_context() + try: + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + students_page = _go_to_students_page(page, class_name) + if students_page is None: + _log(f"ERR: classe '{class_name}' introuvable") + return + + students_page.wait_for_load_state("networkidle") + + # ── Étape : clic 'Bulletins de notes' → clic 'Impression des bulletins' via JS + # puis dump de tous les boutons visibles + btn_bn = students_page.locator("a.dxr-item.dxr-buttonItem:has-text('Bulletins de notes')").first + if btn_bn.count(): + _log("\n=== Clic sur 'Bulletins de notes' ===") + btn_bn.click() + try: + btn_imp = students_page.wait_for_selector( + "text='Impression des bulletins'", state="visible", timeout=12_000 + ) + _log("'Impression des bulletins' visible — clic via JS") + students_page.evaluate("(el) => el.click()", btn_imp) + students_page.wait_for_timeout(6_000) + except PWTimeout: + _log("Popup 'Impression des bulletins' non visible après 12s") + + visible_btns = students_page.evaluate("""() => { + const results = []; + for (const el of document.querySelectorAll('a, button, input[type=submit], input[type=button]')) { + const r = el.getBoundingClientRect(); + if (r.width > 0 && r.height > 0 && el.offsetParent !== null) { + results.push({ + tag: el.tagName, id: el.id || '', + text: (el.innerText || el.value || '').trim().slice(0, 80), + title: el.title || '', + onclick: el.getAttribute('onclick') || '', + }); + } + } + return results; + }""") + _log(f"\n=== Boutons VISIBLES après clic 'Impression des bulletins' ({len(visible_btns)}) ===") + for el in visible_btns: + _log(f" [{el['tag']}#{el['id']}] text={el['text']!r:40s} title={el['title']!r:20s} onclick={el['onclick']!r}") + input("\nInspecte le navigateur puis appuie sur Entrée pour continuer...") + + # 1. Tous les boutons DevExpress ribbon (classe dxr-) + ribbon_items = students_page.evaluate("""() => { + const results = []; + for (const el of document.querySelectorAll('[class*="dxr-"]')) { + const text = (el.innerText || el.textContent || '').trim().slice(0, 80); + if (!text && !el.getAttribute('title')) continue; + results.push({ + tag: el.tagName, + id: el.id || '', + cls: el.className || '', + text: text, + title: el.getAttribute('title') || '', + onclick: el.getAttribute('onclick') || '', + }); + } + return results; + }""") + + _log(f"=== Boutons ribbon (dxr-) sur la page de '{class_name}' ===") + for el in ribbon_items: + _log(f" [{el['tag']}#{el['id']}] cls={el['cls']!r:35s} text={el['text']!r:35s} title={el['title']!r}") + + # 2. Tous les éléments dont le texte/title contient 'Zeugnis' ou 'Bulletin' ou 'note' + keyword_items = students_page.evaluate("""() => { + const kw = ['zeugnis', 'bulletin', 'impression', 'note']; + const results = []; + for (const el of document.querySelectorAll('*')) { + const text = (el.innerText || el.textContent || '').trim().toLowerCase(); + const title = (el.getAttribute('title') || '').toLowerCase(); + if (kw.some(k => text.includes(k) || title.includes(k))) { + const direct = el.childElementCount === 0 || ['A','BUTTON','INPUT'].includes(el.tagName); + if (!direct) continue; + results.push({ + tag: el.tagName, + id: el.id || '', + cls: el.className || '', + text: (el.innerText || el.textContent || '').trim().slice(0, 80), + title: el.getAttribute('title') || '', + onclick: el.getAttribute('onclick') || '', + href: el.getAttribute('href') || '', + }); + } + } + return results; + }""") + + _log(f"\n=== Éléments contenant 'zeugnis/bulletin/note' ===") + for el in keyword_items: + _log(f" [{el['tag']}#{el['id']}] text={el['text']!r:40s} title={el['title']!r:30s} onclick={el['onclick']!r}") + + input("\nAppuie sur Entrée pour fermer...") + finally: + ctx.close() + pw.stop() + + +# ── Synchro unifiée ─────────────────────────────────────────────────────────── + +def _year_of_class(cls: str) -> "int | None": + """Return the year number embedded in a class name. + + "AUTOMAT 1" → 1, "EM-AU 2" → 2, "MP1-TASV 1A" → 1, "MP1-TASV 2B" → 2 + """ + # MP-style: trailing digit + letter (e.g. "MP1-TASV 1A") + m = re.search(r"\s(\d+)[A-Za-z]\s*$", cls) + if m: + return int(m.group(1)) + # Regular: trailing digit (e.g. "AUTOMAT 1", "EM-AU 2") + m = re.search(r"\s(\d+)\s*$", cls) + if m: + return int(m.group(1)) + return None + + +def cmd_sync_all( + selected: list[str], + skip_abs: bool = False, + skip_bn: bool = False, + skip_fiches: bool = False, + skip_notes: bool = False, + force_abs: bool = False, +) -> None: + """Pour chaque classe sélectionnée : absences → BN → Matu → Notes → fiches. + + skip_abs : ne pas télécharger les PDFs d'absences. + skip_bn : ne pas télécharger les PDFs BN ni les notes Matu. + skip_fiches : ne pas scraper les fiches détaillées des élèves. + skip_notes : ne pas télécharger les PDFs de moyennes de notes. + + Sortie : une ligne ALL_DONE avec les clés abs/bn/matu/notes/fiches/errors. + """ + pw, ctx, page = _launch_context() + try: + _cache_load() + page.goto(CLASSES_URL) + _ensure_logged_in(page) + + if not selected: + _log("INFO Récupération de toutes les classes...") + selected = _scrape_classes(page) + + _log(f"TOTAL {len(selected)}") + + abs_downloaded: list[str] = [] + bn_downloaded: list[str] = [] + matu_downloaded: list[str] = [] + notes_downloaded: list[str] = [] + all_fiches: dict[str, list[dict]] = {} + errors: list[str] = [] + + for i, cls in enumerate(selected, 1): + _log(f"PROGRESS {i}/{len(selected)} {cls}") + + # ── Élèves (Notes + BN + fiches) EN PREMIER ─────────────────────── + # Doit précéder les absences : la visite de ViewAbsenzenErweitert + # corrompt le contexte serveur et rend le bouton Notes inopérant. + if not skip_bn or not skip_fiches or not skip_notes: + try: + sp = _go_to_students_page(page, cls) + if sp is None: + if not skip_bn: + errors.append(f"{cls}: page élèves introuvable (BN)") + if not skip_notes: + errors.append(f"{cls}: page élèves introuvable (Notes)") + if not skip_fiches: + errors.append(f"{cls}: page élèves introuvable (fiches)") + else: + if not skip_notes: + pdf_notes = _download_notes_pdf(sp, cls) + if pdf_notes: + notes_downloaded.append(str(pdf_notes)) + else: + errors.append(f"{cls}: téléchargement Notes échoué") + if not skip_bn: + pdf_bn = _download_bn_pdf(sp, cls) + if pdf_bn: + bn_downloaded.append(str(pdf_bn)) + else: + errors.append(f"{cls}: téléchargement BN échoué") + if not skip_fiches: + try: + fiches = _scrape_student_details(sp, cls) + all_fiches[cls] = fiches + except Exception as e: + _log(f"ERR {cls} [fiches]: {e}") + errors.append(f"{cls} [fiches]: {e}") + except Exception as e: + _log(f"ERR {cls} [bn/notes/fiches]: {e}") + errors.append(f"{cls} [bn/notes/fiches]: {e}") + + # ── Absences ENSUITE ─────────────────────────────────────────────── + if not skip_abs: + try: + abs_page = _go_to_absence_page(page, cls) + if abs_page is None: + errors.append(f"{cls}: page absences introuvable") + else: + pdf = _download_pdf(abs_page, cls) + if pdf: + abs_downloaded.append(str(pdf)) + else: + errors.append(f"{cls}: téléchargement absences échoué") + except Exception as e: + _log(f"ERR {cls} [abs]: {e}") + errors.append(f"{cls} [abs]: {e}") + + # ── Matu : classes MP correspondant aux années présentes dans la sélection ── + if not skip_bn: + years_needed: set[int] = set() + for c in selected: + y = _year_of_class(c) + if y is not None: + years_needed.add(y) + + if years_needed: + all_known: list[str] = [] + if CLASSES_CACHE_FILE.exists(): + try: + all_known = json.loads(CLASSES_CACHE_FILE.read_text(encoding="utf-8")) + except Exception: + pass + + mp_targets = [ + c for c in all_known + if re.match(r"MP\d", c, re.I) and _year_of_class(c) in years_needed + ] + + if mp_targets: + _log(f"MATU classes cibles: {mp_targets}") + for j, mp_cls in enumerate(mp_targets, 1): + _log(f"MATU {j}/{len(mp_targets)} {mp_cls}") + try: + sp_matu = _go_to_students_page(page, mp_cls) + if sp_matu is None: + _log(f"INFO MATU {mp_cls}: page eleves introuvable") + else: + pdf_matu = _download_matu_pdf(sp_matu, mp_cls) + if pdf_matu: + matu_downloaded.append(str(pdf_matu)) + except Exception as e: + _log(f"ERR {mp_cls} [matu]: {e}") + else: + _log(f"INFO Aucune classe MP trouvee pour annees {sorted(years_needed)}") + + from datetime import datetime as _dt + _all_done_payload = {'abs': abs_downloaded, 'bn': bn_downloaded, 'matu': matu_downloaded, 'notes': notes_downloaded, 'fiches': all_fiches, 'errors': errors} + try: + _adf = _root / 'data' / 'sync_all_done.json' + _adf.parent.mkdir(parents=True, exist_ok=True) + _adf.write_text(json.dumps({'timestamp': _dt.now().isoformat(), 'payload': _all_done_payload}, ensure_ascii=False), encoding='utf-8') + _log('sync_all_done.json ecrit par subprocess') + try: + import subprocess as _sp + _imp = _sp.Popen( + [sys.executable, str(_root / 'scripts' / 'run_imports.py'), str(_root / 'data'), 'escada', '1' if force_abs else '0'], + start_new_session=True, stdout=_sp.DEVNULL, stderr=_sp.DEVNULL, + ) + _log(f'run_imports lance (pid={_imp.pid})') + except Exception as _ie: + _log(f'WARN run_imports non lance: {_ie}') + except Exception as _e: + _log(f'WARN sync_all_done.json non ecrit: {_e}') + _log( + f"ALL_DONE {json.dumps(_all_done_payload, ensure_ascii=False)}" + ) + finally: + ctx.close() + pw.stop() + + +# ── Point d'entrée ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import argparse + + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--list-classes", action="store_true", + help="Récupère la liste des classes et l'affiche en JSON") + ap.add_argument("--sync-all", nargs="*", metavar="CLASSE", + help="Synchro unifiée : absences + BN + Matu pour chaque classe") + ap.add_argument("--skip-abs", action="store_true", + help="Ne pas télécharger les PDFs d'absences (utilisé avec --sync-all)") + ap.add_argument("--skip-bn", action="store_true", + help="Ne pas télécharger les PDFs BN + Matu (utilisé avec --sync-all)") + ap.add_argument("--skip-fiches", action="store_true", + help="Ne pas scraper les fiches détaillées des élèves (utilisé avec --sync-all)") + ap.add_argument("--skip-notes", action="store_true", + help="Ne pas télécharger les PDFs de moyennes de notes (utilisé avec --sync-all)") + ap.add_argument("--sync", nargs="*", metavar="CLASSE", + help="Télécharge les PDFs d'absences uniquement") + ap.add_argument("--sync-bn", nargs="*", metavar="CLASSE", + help="Télécharge les PDFs de bulletins de notes uniquement") + ap.add_argument("--sync-fiches", nargs="*", metavar="CLASSE", + help="Scrape les fiches détaillées des élèves uniquement") + ap.add_argument("--sync-matu", nargs="*", metavar="CLASSE", + help="Télécharge les listes de contrôle des notes MP uniquement") + ap.add_argument("--sync-notes", nargs="*", metavar="CLASSE", + help="Télécharge les PDFs de moyennes de notes (apprenant) uniquement") + ap.add_argument("--debug-bn", metavar="CLASSE", + help="Affiche les boutons du ribbon sur la page liste-élèves (debug)") + ap.add_argument('--force-abs', action='store_true', + help='Reimporter les absences existantes (efface les EscadaPending)') + args = ap.parse_args() + + if args.list_classes: + cmd_list_classes() + elif args.sync_all is not None: + cmd_sync_all(args.sync_all, skip_abs=args.skip_abs, skip_bn=args.skip_bn, + skip_fiches=args.skip_fiches, skip_notes=args.skip_notes, + force_abs=args.force_abs) + elif args.sync is not None: + cmd_sync(args.sync) + elif args.sync_bn is not None: + cmd_sync_bn(args.sync_bn) + elif args.sync_fiches is not None: + cmd_sync_fiches(args.sync_fiches) + elif args.sync_matu is not None: + cmd_sync_matu(args.sync_matu) + elif args.sync_notes is not None: + cmd_sync_notes(args.sync_notes) + elif args.debug_bn: + cmd_debug_bn(args.debug_bn) + else: + ap.print_help() diff --git a/src/importer_notes.py b/src/importer_notes.py new file mode 100644 index 0000000..b757c9b --- /dev/null +++ b/src/importer_notes.py @@ -0,0 +1,125 @@ +import json +import re +from pathlib import Path +from sqlalchemy import select, delete +from sqlalchemy.orm import Session +from .db import Apprenti, NotesExamen + + +def parse_notes_pdf(pdf_path: Path, nom: str, prenom: str) -> list[dict] | None: + """Parse le PDF de notes d'examen pour un apprenti donné.""" + try: + import pdfplumber as _pp + except ImportError: + return None + + _RE_BR = re.compile( + r"^(.+?)\s+[A-Z][A-Z0-9\-]+(?: \d+)?\s+\((\d+\.\d+)\)\s+(\d+\.\d+)$" + ) + _RE_DT = re.compile(r"^(\d{2}\.\d{2}\.\d{4})\s+(.+)$") + + def _exam(line: str) -> dict | None: + m = _RE_DT.match(line) + if not m: + return None + date, rest = m.group(1), m.group(2).strip() + tok = rest.split() + note = None + disp = False + i = len(tok) - 1 + if i >= 0 and re.match(r"^\d+\.\d+$", tok[i]): + note = float(tok[i]); i -= 1 + elif i >= 0 and tok[i].lower() in ("disp.", "disp"): + note = "disp."; i -= 1 + if i >= 0 and tok[i].lower() == "x": + disp = True; i -= 1 + typ = tok[i] if i >= 0 else ""; i -= 1 + coeff = None + if i >= 0 and re.match(r"^\d+\.\d+$", tok[i]): + coeff = float(tok[i]); i -= 1 + ens = "" + if i >= 0 and re.match(r"^[A-Z]{3,8}$", tok[i]): + ens = tok[i]; i -= 1 + desc = " ".join(tok[: i + 1]) + if coeff is None and not ens: + return None + return { + "date": date, "description": desc, "enseignant": ens, + "coefficient": coeff, "type": typ, "dispensed": disp, "note": note, + } + + _SKIP = { + "Departement", "Service", "Ecole professionnelle", "Chemin", + "Case postale", "Rue", "Monthey", "Sion", "Seite", "Liste interm", + "Classe:", "Absences", "Matiere", "Date Examen", + "Branches de culture", "Branches professionnelles", + } + + try: + nom_l = nom.lower() + prenom_l = prenom.lower() + with _pp.open(str(pdf_path)) as pdf: + pages = [ + p.extract_text(x_tolerance=2) or "" + for p in pdf.pages + if nom_l in (p.extract_text(x_tolerance=2) or "").lower() + and prenom_l in (p.extract_text(x_tolerance=2) or "").lower() + ] + if not pages: + return None + lines = [ln.strip() for ln in "\n".join(pages).splitlines() if ln.strip()] + branches: list[dict] = [] + cur: dict | None = None + for line in lines: + if any(kw in line for kw in _SKIP): + continue + if re.match(r"^\d{4}$", line): + continue + m = _RE_BR.match(line) + if m: + cur = { + "branche": m.group(1).strip(), + "moy_prov": float(m.group(2)), + "moy_arr": float(m.group(3)), + "examens": [], + } + branches.append(cur) + continue + if cur and re.match(r"^\d{2}\.\d{2}\.\d{4}", line): + e = _exam(line) + if e: + cur["examens"].append(e) + return branches or None + except Exception: + return None + + +def import_notes_pdf(pdf_path: Path, sess: Session, classe: str | None = None) -> dict: + """Importe les notes d'examen depuis un PDF pour tous les apprentis de la classe. + + Retourne {"classe": str, "nb": int}. + """ + p = Path(pdf_path) + if classe is None: + classe = p.stem.replace("notes_", "").replace("_", " ") + + apprentis = sess.execute( + select(Apprenti).where(Apprenti.classe == classe) + ).scalars().all() + + ne_ids = [ap.id for ap in apprentis] + if ne_ids: + sess.execute(delete(NotesExamen).where(NotesExamen.apprenti_id.in_(ne_ids))) + + nb = 0 + for ap in apprentis: + branches = parse_notes_pdf(p, ap.nom, ap.prenom) + if branches is None: + continue + sess.add(NotesExamen( + apprenti_id=ap.id, + donnees_json=json.dumps(branches, ensure_ascii=False), + )) + nb += 1 + sess.commit() + return {"classe": classe, "nb": nb} diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..ea7d647 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,27 @@ +import os +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo + +_TZ = ZoneInfo("Europe/Zurich") + +_ROOT = Path(__file__).resolve().parent.parent +DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data"))) +_LOG_FILE = DATA_DIR / "logs" / "operations.log" + + +def app_log(msg: str, debug: bool = False) -> None: + """Écrit une entrée dans operations.log. + + PROD (debug=False) : [HH:MM:SS] msg — ligne non indentée, visible en mode PROD + DEBUG (debug=True) : [HH:MM:SS] msg — ligne indentée, visible seulement en mode DEBUG + """ + ts = datetime.now(tz=_TZ).strftime("%H:%M:%S") + indent = " " if debug else "" + line = f"[{ts}] {indent}{msg}" + try: + _LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + with _LOG_FILE.open("a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception: + pass