#!/usr/bin/env python3 """Scrape la liste des classes accessibles à un utilisateur dans Escadaweb. Usage : python scripts/fetch_user_classes.py Lit `escada_username` et `escada_password` depuis `auth.yaml` pour le user. Le code TOTP (6 chiffres) est lu depuis la variable d'environnement TOTP_CODE. Écrit le résultat dans data/sync_user_classes_.json sous la forme : {"ok": true, "classes": [...], "duration_s": 12.3} ou en cas d'échec : {"ok": false, "error": "...", "classes": []} Le browser tourne en mode headless. Profil Chromium éphémère (pas de persistance entre sessions — chaque user a sa propre session indépendante de celle de l'admin). """ from __future__ import annotations import json import os import sys import time import tempfile from pathlib import Path _ROOT = Path(__file__).resolve().parent.parent if str(_ROOT) not in sys.path: sys.path.insert(0, str(_ROOT)) if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") if hasattr(sys.stderr, "reconfigure"): sys.stderr.reconfigure(encoding="utf-8", errors="replace") import yaml from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Error as PWError from scripts.sync_esacada import ( BASE_URL, LEHRPERSONEN_URL, CLASSES_URL, _ensure_french_language, _scrape_classes, ) from src.logger import app_log DATA_DIR = _ROOT / "data" AUTH_FILE = DATA_DIR / "auth.yaml" _USERNAME = "" # set par main() pour préfixer les logs def _log(msg: str) -> None: from datetime import datetime line = f"[{datetime.now().strftime('%H:%M:%S')}] {msg}" print(line, flush=True) # Log aussi dans operations.log (visible en live depuis /logs) try: app_log(f"[fetch_classes:{_USERNAME or '?'}] {msg}") except Exception: pass def _load_user_creds(username: str) -> tuple[str, str]: """Lit (escada_username, escada_password) depuis auth.yaml.""" if not AUTH_FILE.exists(): raise RuntimeError("auth.yaml introuvable") cfg = yaml.safe_load(AUTH_FILE.read_text(encoding="utf-8")) or {} user = cfg.get("credentials", {}).get("usernames", {}).get(username) if not user: raise RuntimeError(f"Utilisateur {username!r} introuvable dans auth.yaml") e_user = (user.get("escada_username") or "").strip() e_pass = (user.get("escada_password") or "").strip() if not e_user or not e_pass: raise RuntimeError( f"Identifiants Escada manquants pour {username!r} " "(escada_username / escada_password)" ) return e_user, e_pass def _fill_login(page, escada_user: str, escada_pass: str) -> bool: """Remplit le formulaire Keycloak avec les creds passés.""" try: page.wait_for_selector("input#username", state="visible", timeout=5_000) page.wait_for_selector("input#password", state="visible", timeout=2_000) _log(" [LOGIN] Formulaire Keycloak détecté") page.locator("input#username").fill(escada_user) page.locator("input#password").fill(escada_pass) try: page.locator("input#kc-login").click(timeout=2_000) except Exception: page.locator("input#password").press("Enter") return True except Exception as e: _log(f" [LOGIN] ERR : {e}") return False def _fill_totp(page, code: str) -> bool: """Saisie du code TOTP via JS (le champ est caché par CSS).""" _log(f" [2FA] Saisie du code") try: result = page.evaluate("""(code) => { const inp = document.querySelector('#otp') || document.querySelector('[name="otp"]') || document.querySelector('[autocomplete="one-time-code"]') || document.querySelector('input[type="text"]:not([type="hidden"])'); if (!inp) return 'not_found'; inp.value = code; inp.dispatchEvent(new Event('input', {bubbles: true})); inp.dispatchEvent(new Event('change', {bubbles: true})); return 'filled'; }""", code) if result != "filled": _log(f" [2FA] champ introuvable ({result})") return False submitted = page.evaluate("""() => { const btn = document.querySelector('input[type="submit"]') || document.querySelector('button[type="submit"]'); if (btn) { btn.click(); return 'clicked'; } const form = document.querySelector('form'); if (form) { form.submit(); return 'submitted'; } return 'no_submit'; }""") return submitted in ("clicked", "submitted") except Exception as e: _log(f" [2FA] err : {e}") return False def fetch_classes(username: str, totp_code: str) -> dict: """Fait login + scrape ViewKlassen et retourne le résultat.""" e_user, e_pass = _load_user_creds(username) t_start = time.time() profile_dir = tempfile.mkdtemp(prefix=f"escada_{username}_") pw = sync_playwright().start() try: ctx = pw.chromium.launch_persistent_context( profile_dir, headless=True, args=["--disable-popup-blocking"], ) page = ctx.pages[0] if ctx.pages else ctx.new_page() try: _log(f"GOTO {CLASSES_URL}") page.goto(CLASSES_URL) # Boucle login + 2FA (timeout 90s) deadline = time.time() + 90 login_done = False totp_done = False last_url = "" stuck_counter = 0 while time.time() < deadline: cur = page.url.lower() if page.url != last_url: _log(f" url: {page.url[:120]}") last_url = page.url stuck_counter = 0 if "viewklassen" in cur: _log("LOGIN_OK") break # Si on est sur une page hors flux (Timeout.aspx, root EPTM, # erreur DevExpress), forcer un goto vers Lehrpersonen pour # déclencher le redirect Keycloak. if not any(k in cur for k in ( "edusso", "login", "authenticate", "logon", "otp", "lehrpersonen/viewklassen", )): _log(f" hors flux ({cur[:80]}…) → goto Lehrpersonen") try: page.goto(LEHRPERSONEN_URL, timeout=15_000) except (PWTimeout, PWError) as _e: _log(f" goto err : {_e}") page.wait_for_timeout(1_000) continue if not login_done: if _fill_login(page, e_user, e_pass): login_done = True _log(" login submitted, wait for redirect…") try: page.wait_for_load_state("networkidle", timeout=8_000) except (PWTimeout, PWError): pass if not totp_done and ( "authenticate" in cur or "otp" in cur or page.locator("input#otp").count() > 0 ): if _fill_totp(page, totp_code): totp_done = True _log(" otp submitted, wait for redirect to ViewKlassen…") try: page.wait_for_url("**ViewKlassen**", timeout=15_000) except (PWTimeout, PWError): _log(f" wait_for_url failed, url={page.url[:120]}") page.wait_for_timeout(800) stuck_counter += 1 # Sortie anticipée si totp validé mais redirect ne vient pas # (probablement code OTP invalide ou expiré) if totp_done and stuck_counter > 15 and "viewklassen" not in cur: _log(f" TOTP submitted mais pas de redirect → code peut-être invalide") break else: # Diagnostic supplémentaire _log(f"TIMEOUT url={page.url[:120]} login_done={login_done} totp_done={totp_done}") try: # Pages d'erreur Keycloak fréquentes body_txt = page.evaluate("() => (document.body && document.body.innerText || '').slice(0, 500)") _log(f" body_preview: {body_txt!r}") except Exception: pass raise RuntimeError( f"Timeout login (90s) — login_done={login_done} totp_done={totp_done} url={page.url[:80]}" ) # Force le français + scrape _ensure_french_language(page) page.goto(CLASSES_URL, wait_until="domcontentloaded", timeout=15_000) try: page.wait_for_load_state("networkidle", timeout=10_000) except (PWTimeout, PWError): pass classes = _scrape_classes(page) # Filtre : exclure les classes MP* (Matu Pro), MI* (Maîtrise), # "Formation*" (modules de formation continue, hors flux régulier). filtered = [ c for c in classes if not ( c.startswith("MP") or c.startswith("MI") or c.lower().startswith("formation") ) ] removed = sorted(set(classes) - set(filtered)) _log(f"DONE {len(filtered)} classes (filtré {len(removed)} : {removed})") return { "ok": True, "classes": filtered, "duration_s": round(time.time() - t_start, 1), } finally: try: ctx.close() except Exception: pass finally: try: pw.stop() except Exception: pass # cleanup du profile temporaire import shutil shutil.rmtree(profile_dir, ignore_errors=True) def main(): global _USERNAME if len(sys.argv) < 2: print("Usage: fetch_user_classes.py ", file=sys.stderr) sys.exit(2) username = sys.argv[1].strip() _USERNAME = username totp_code = (os.getenv("TOTP_CODE") or "").strip() if not totp_code or not totp_code.isdigit() or len(totp_code) != 6: result = {"ok": False, "error": "TOTP_CODE manquant ou invalide (6 chiffres requis)", "classes": []} else: try: result = fetch_classes(username, totp_code) except Exception as e: result = {"ok": False, "error": str(e), "classes": []} out_file = DATA_DIR / f"sync_user_classes_{username}.json" out_file.parent.mkdir(parents=True, exist_ok=True) out_file.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8") print(json.dumps(result, ensure_ascii=False)) sys.exit(0 if result.get("ok") else 1) if __name__ == "__main__": main()