#!/usr/bin/env python3 """Pull des notices depuis Escadaweb pour les apprentis des classes données. Usage : python pull_notices.py CLASSE1 CLASSE2 ... Pour chaque classe : 1. Navigue vers la liste Élèves (ViewLernende) 2. Pour chaque apprenti de la classe : - Clic "Notices" dans sa ligne - Scrape la grille (pagination gérée) - Wipe + insert les notices dans ApprentiNotice - Retour à la liste Élèves 3. Passe à la classe suivante Sortie standard (parsable) : PULL_NOTICES_DONE {"ok": N_apprentis_ok, "imported": N_notices, "err": [...]} """ from __future__ import annotations import json import re import sys import traceback from datetime import date, datetime from pathlib import Path _ROOT = Path(__file__).resolve().parent.parent if str(_ROOT) not in sys.path: sys.path.insert(0, str(_ROOT)) from sqlalchemy import select, delete # noqa: E402 from playwright.sync_api import Page, TimeoutError as PWTimeout # noqa: E402 from src.db import get_session, Apprenti, ApprentiNotice # noqa: E402 from src.logger import app_log # noqa: E402 from scripts.sync_esacada import ( # noqa: E402 _launch_context, _ensure_logged_in, _go_to_students_page, _log, CLASSES_URL, ) _DATE_RE = re.compile(r"(\d{2})\.(\d{2})\.(\d{4})") def _parse_date(s: str) -> date | None: if not s: return None m = _DATE_RE.search(s) if not m: return None try: return date(int(m.group(3)), int(m.group(2)), int(m.group(1))) except Exception: return None def _scrape_notices_grid(page: Page) -> list[dict]: """Scrape toutes les pages de la grille des notices. Ordre des colonnes attendu (basé sur la structure DevExpress observée) : 0 Editer (icône) | 1 Date | 2 Type | 3 Auteur | 4 Titre | 5 Remarques 6 Visible classe (checkbox) | 7 Matière | 8-11 visibilités (ignorées) """ notices: list[dict] = [] seen_rows: set[str] = set() # éviter de re-scraper après navigation pagination current_pg = 1 while True: try: page.wait_for_selector( "table[id$='gridNotizen_DXMainTable']", state="attached", timeout=10_000, ) except PWTimeout: _log(f" [notices p={current_pg}] grille non chargée") break # Récupérer toutes les lignes de données via JS pour fiabilité rows_data = page.evaluate("""() => { const out = []; const rows = document.querySelectorAll("tr[id*='gridNotizen_DXDataRow']"); for (const tr of rows) { const cells = tr.querySelectorAll(":scope > td"); const texts = Array.from(cells).map(td => (td.innerText || td.textContent || '').trim()); // Détection checkbox "Visible classe" : présence d'une image cochée const cb = cells[6] ? cells[6].querySelector("img") : null; const vis = cb ? !(cb.src || '').toLowerCase().includes('unchecked') : null; out.push({texts, visible: vis, id: tr.id}); } return out; }""") added = 0 for row in rows_data: if row["id"] in seen_rows: continue seen_rows.add(row["id"]) t = row["texts"] # Index défensif (cas où le DOM diffère légèrement) def col(i: int) -> str: return t[i] if i < len(t) else "" notices.append({ "date": _parse_date(col(1)), "type": col(2) or None, "auteur": col(3) or None, "titre": col(4) or None, "remarque": col(5) or None, "matiere": col(7) or None, "visible_classe": row.get("visible"), }) added += 1 _log(f" [notices p={current_pg}] +{added} ligne(s)") # Pagination : aller à la page suivante si dispo try: next_link = page.locator( f"a.dxp-num:has-text('{current_pg + 1}')" ).first if next_link.count() == 0: break next_link.click() page.wait_for_load_state("networkidle", timeout=10_000) page.wait_for_timeout(400) current_pg += 1 except Exception: break return notices def _student_rows(page: Page) -> list[dict]: """Liste des lignes Élèves avec nom, prénom, et drapeau "a des notices". Structure de la grille Lernende (cellules) : [0] Detail expand [1] Notes link icon [2] Edit button [3] **Nom** [4] **Prénom** [5] Entreprise [6] MP / [7] Disp. CG / [8] Abs. excu / [9] Abs. non excu / [10] Remarque [11] Compensation / [12] Documents [13] **Notices link** (icône : note_pinned = vide, note_text = avec) [14] History / [15] Tasks Format : [{row_id, nom, prenom, has_notices, notices_href}]. Gère la pagination. """ out: list[dict] = [] seen: set[str] = set() current_pg = 1 while True: rows = page.evaluate("""() => { const out = []; const trs = document.querySelectorAll("tr[id*='GridLernende_DXDataRow']"); for (const tr of trs) { const cells = tr.querySelectorAll(":scope > td"); const txt = (i) => cells[i] ? (cells[i].innerText || cells[i].textContent || '').trim() : ''; const nom = txt(3); const prenom = txt(4); // Lien Notices = cellule 13 (peut varier si Escada change l'ordre) let hasNotices = false; let noticesHref = null; // Cherche dans toute la ligne le lien Notices via son title const noticeLink = tr.querySelector("a[title='Notices']"); if (noticeLink) { noticesHref = noticeLink.getAttribute('href'); const img = noticeLink.querySelector('img'); if (img) { const src = (img.getAttribute('src') || '').toLowerCase(); hasNotices = src.includes('note_text'); } } out.push({ id: tr.id, nom: nom, prenom: prenom, has_notices: hasNotices, notices_href: noticesHref, }); } return out; }""") added = 0 for r in rows: if r["id"] in seen: continue seen.add(r["id"]) out.append({ "row_id": r["id"], "nom": r["nom"], "prenom": r["prenom"], "has_notices": r["has_notices"], "notices_href": r["notices_href"], }) added += 1 _log(f" [élèves p={current_pg}] +{added}") # Page suivante ? try: next_link = page.locator( f"a.dxp-num:has-text('{current_pg + 1}')" ).first if next_link.count() == 0: break next_link.click() page.wait_for_load_state("networkidle", timeout=10_000) page.wait_for_timeout(400) current_pg += 1 except Exception: break return out def _pull_one_row( page: Page, sess, row: dict, classe: str, students_url: str, db_apprentis: list, ) -> tuple[int, str | None, Apprenti | None]: """Pour une ligne Élève avec notices, scrape la grille et insert en DB. `row` est le dict produit par `_student_rows` : {row_id, nom, prenom, has_notices, notices_href} Retourne (nb_importées, err, apprenti_match). """ nom = (row.get("nom") or "").strip() prenom = (row.get("prenom") or "").strip() # 1. Recherche match dans la liste DB de la classe (avant navigation). # Plusieurs stratégies en cascade pour tolérer les différences de # découpage nom/prénom (ex: "Loureiro" + "de Menezes Tiago" en DB vs # "Loureiro de Menezes" + "Tiago" sur Escada). import unicodedata def _norm(s: str) -> str: nfkd = unicodedata.normalize("NFKD", s or "") return " ".join( nfkd.encode("ascii", "ignore").decode("ascii").lower().split() ) full_escada = _norm(f"{nom} {prenom}") match: Apprenti | None = None # Stratégie A : match nom strict + premier mot du prénom for a in db_apprentis: db_nom = (a.nom or "").strip() db_pre1 = (a.prenom or "").strip().split(maxsplit=1)[0] if a.prenom else "" if db_nom == nom and prenom and ( prenom.startswith(db_pre1) or db_pre1.startswith(prenom.split(maxsplit=1)[0]) ): match = a break # Stratégie B : match nom strict seul if not match: for a in db_apprentis: if (a.nom or "").strip() == nom: match = a break # Stratégie C : match par nom complet normalisé (sans accents, casse insensible) if not match and full_escada: for a in db_apprentis: full_db = _norm(f"{a.nom} {a.prenom}") if full_db == full_escada: match = a break if not match: return 0, f"apprenti '{nom} {prenom}' non trouvé en DB pour {classe}", None # 2. Navigation vers la page Notices : on utilise href si dispo (plus rapide), # sinon clic sur le lien Notices de la ligne. href = row.get("notices_href") try: if href: # href peut être relatif (ex: "ViewNotizen.aspx?id=...") — on résout via JS target_url = page.evaluate( "(h) => new URL(h, document.baseURI).href", href ) page.goto(target_url) else: page.locator(f"#{row['row_id']}").get_by_role("link", name="Notices").first.click() page.wait_for_load_state("networkidle", timeout=15_000) except Exception as e: return 0, f"navigation Notices : {e}", match # 3. Scrape grille try: notices = _scrape_notices_grid(page) except Exception as e: try: page.goto(students_url) page.wait_for_load_state("networkidle", timeout=10_000) except Exception: pass return 0, f"scrape grille : {e}", match # 4. Insert (le wipe global a déjà été fait au début de la classe) try: for n in notices: if not n["date"]: continue sess.add(ApprentiNotice( apprenti_id = match.id, date_event = n["date"], type_notice = n.get("type"), auteur = n.get("auteur"), titre = n.get("titre"), remarque = n.get("remarque"), matiere = n.get("matiere"), visible_classe = n.get("visible_classe"), imported_at = datetime.now(), )) sess.commit() except Exception as e: sess.rollback() return 0, f"DB insert : {e}", match # 5. Retour à la liste élèves try: page.goto(students_url) page.wait_for_load_state("networkidle", timeout=15_000) except Exception: pass return len(notices), None, match def main(): if len(sys.argv) < 2: print("Usage : pull_notices.py CLASSE1 [CLASSE2 ...]", file=sys.stderr) sys.exit(2) target_classes = [c for c in sys.argv[1:] if c.strip()] sess = get_session() ok_count = 0 total_imported = 0 errors: list[str] = [] try: app_log(f"[pull_notices] démarrage — {len(target_classes)} classe(s)") pw, ctx, page = _launch_context() try: page.goto(CLASSES_URL) _ensure_logged_in(page) for classe in target_classes: _log(f"[pull_notices] classe={classe}") # 1. Wipe global des notices existantes pour les apprentis de cette classe db_apprentis = sess.execute( select(Apprenti).where(Apprenti.classe == classe) ).scalars().all() if db_apprentis: appr_ids = [a.id for a in db_apprentis] sess.execute( delete(ApprentiNotice).where(ApprentiNotice.apprenti_id.in_(appr_ids)) ) sess.commit() _log(f" [{classe}] wipe ApprentiNotice : {len(appr_ids)} apprenti(s)") # 2. Navigue vers la liste Élèves try: students_page = _go_to_students_page(page, classe) except Exception as e: students_page = None _log(f" ERR navigation : {e}") if not students_page: errors.append(f"classe '{classe}' : page Élèves introuvable") continue students_url = students_page.url # 3. Liste des lignes (avec drapeau has_notices) try: rows = _student_rows(students_page) except Exception as e: errors.append(f"classe '{classe}' : scrape liste élèves : {e}") continue nb_with = sum(1 for r in rows if r["has_notices"]) _log(f" [{classe}] {len(rows)} élève(s), {nb_with} avec notice(s)") # 4. Pour chaque ligne ayant des notices : pull for r in rows: label = f"{r.get('nom','?')} {r.get('prenom','?')}" if not r["has_notices"]: continue try: n, err, match = _pull_one_row( students_page, sess, r, classe, students_url, db_apprentis, ) if err: errors.append(f"{label} ({classe}) : {err}") try: students_page.goto(students_url) students_page.wait_for_load_state("networkidle", timeout=10_000) except Exception: break else: ok_count += 1 total_imported += n _log(f" OK {label} : {n} notice(s)") except Exception as e: errors.append(f"{label} ({classe}) : {e}") _log(f" EX {label} : {e}\n{traceback.format_exc()}") finally: try: ctx.close() except Exception: pass try: pw.stop() except Exception: pass finally: sess.close() print( 'PULL_NOTICES_DONE ' + json.dumps({ "ok": ok_count, "imported": total_imported, "err": errors, }, ensure_ascii=False), flush=True, ) app_log( f"[pull_notices] terminé — {ok_count} apprenti(s) OK, " f"{total_imported} notice(s) importée(s), {len(errors)} erreur(s)" ) if __name__ == "__main__": main()