eptm_dashboard/scripts/fetch_user_classes.py
2026-05-12 15:30:28 +02:00

278 lines
11 KiB
Python

#!/usr/bin/env python3
"""Scrape la liste des classes accessibles à un utilisateur dans Escadaweb.
Usage :
python scripts/fetch_user_classes.py <username>
Lit `escada_username` et `escada_password` depuis `auth.yaml` pour le user.
Le code TOTP (6 chiffres) est lu depuis la variable d'environnement TOTP_CODE.
Écrit le résultat dans data/sync_user_classes_<username>.json sous la forme :
{"ok": true, "classes": [...], "duration_s": 12.3}
ou en cas d'échec :
{"ok": false, "error": "...", "classes": []}
Le browser tourne en mode headless. Profil Chromium éphémère (pas de
persistance entre sessions — chaque user a sa propre session indépendante
de celle de l'admin).
"""
from __future__ import annotations
import json
import os
import sys
import time
import tempfile
from pathlib import Path
_ROOT = Path(__file__).resolve().parent.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
import yaml
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout, Error as PWError
from scripts.sync_esacada import (
BASE_URL, LEHRPERSONEN_URL, CLASSES_URL,
_ensure_french_language, _scrape_classes,
)
from src.logger import app_log
DATA_DIR = _ROOT / "data"
AUTH_FILE = DATA_DIR / "auth.yaml"
_USERNAME = "" # set par main() pour préfixer les logs
def _log(msg: str) -> None:
from datetime import datetime
line = f"[{datetime.now().strftime('%H:%M:%S')}] {msg}"
print(line, flush=True)
# Log aussi dans operations.log (visible en live depuis /logs)
try:
app_log(f"[fetch_classes:{_USERNAME or '?'}] {msg}")
except Exception:
pass
def _load_user_creds(username: str) -> tuple[str, str]:
"""Lit (escada_username, escada_password) depuis auth.yaml."""
if not AUTH_FILE.exists():
raise RuntimeError("auth.yaml introuvable")
cfg = yaml.safe_load(AUTH_FILE.read_text(encoding="utf-8")) or {}
user = cfg.get("credentials", {}).get("usernames", {}).get(username)
if not user:
raise RuntimeError(f"Utilisateur {username!r} introuvable dans auth.yaml")
e_user = (user.get("escada_username") or "").strip()
e_pass = (user.get("escada_password") or "").strip()
if not e_user or not e_pass:
raise RuntimeError(
f"Identifiants Escada manquants pour {username!r} "
"(escada_username / escada_password)"
)
return e_user, e_pass
def _fill_login(page, escada_user: str, escada_pass: str) -> bool:
"""Remplit le formulaire Keycloak avec les creds passés."""
try:
page.wait_for_selector("input#username", state="visible", timeout=5_000)
page.wait_for_selector("input#password", state="visible", timeout=2_000)
_log(" [LOGIN] Formulaire Keycloak détecté")
page.locator("input#username").fill(escada_user)
page.locator("input#password").fill(escada_pass)
try:
page.locator("input#kc-login").click(timeout=2_000)
except Exception:
page.locator("input#password").press("Enter")
return True
except Exception as e:
_log(f" [LOGIN] ERR : {e}")
return False
def _fill_totp(page, code: str) -> bool:
"""Saisie du code TOTP via JS (le champ est caché par CSS)."""
_log(f" [2FA] Saisie du code")
try:
result = page.evaluate("""(code) => {
const inp = document.querySelector('#otp')
|| document.querySelector('[name="otp"]')
|| document.querySelector('[autocomplete="one-time-code"]')
|| document.querySelector('input[type="text"]:not([type="hidden"])');
if (!inp) return 'not_found';
inp.value = code;
inp.dispatchEvent(new Event('input', {bubbles: true}));
inp.dispatchEvent(new Event('change', {bubbles: true}));
return 'filled';
}""", code)
if result != "filled":
_log(f" [2FA] champ introuvable ({result})")
return False
submitted = page.evaluate("""() => {
const btn = document.querySelector('input[type="submit"]')
|| document.querySelector('button[type="submit"]');
if (btn) { btn.click(); return 'clicked'; }
const form = document.querySelector('form');
if (form) { form.submit(); return 'submitted'; }
return 'no_submit';
}""")
return submitted in ("clicked", "submitted")
except Exception as e:
_log(f" [2FA] err : {e}")
return False
def fetch_classes(username: str, totp_code: str) -> dict:
"""Fait login + scrape ViewKlassen et retourne le résultat."""
e_user, e_pass = _load_user_creds(username)
t_start = time.time()
profile_dir = tempfile.mkdtemp(prefix=f"escada_{username}_")
pw = sync_playwright().start()
try:
ctx = pw.chromium.launch_persistent_context(
profile_dir,
headless=True,
args=["--disable-popup-blocking"],
)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
try:
_log(f"GOTO {CLASSES_URL}")
page.goto(CLASSES_URL)
# Boucle login + 2FA (timeout 90s)
deadline = time.time() + 90
login_done = False
totp_done = False
last_url = ""
stuck_counter = 0
while time.time() < deadline:
cur = page.url.lower()
if page.url != last_url:
_log(f" url: {page.url[:120]}")
last_url = page.url
stuck_counter = 0
if "viewklassen" in cur:
_log("LOGIN_OK")
break
# Si on est sur une page hors flux (Timeout.aspx, root EPTM,
# erreur DevExpress), forcer un goto vers Lehrpersonen pour
# déclencher le redirect Keycloak.
if not any(k in cur for k in (
"edusso", "login", "authenticate", "logon", "otp",
"lehrpersonen/viewklassen",
)):
_log(f" hors flux ({cur[:80]}…) → goto Lehrpersonen")
try:
page.goto(LEHRPERSONEN_URL, timeout=15_000)
except (PWTimeout, PWError) as _e:
_log(f" goto err : {_e}")
page.wait_for_timeout(1_000)
continue
if not login_done:
if _fill_login(page, e_user, e_pass):
login_done = True
_log(" login submitted, wait for redirect…")
try:
page.wait_for_load_state("networkidle", timeout=8_000)
except (PWTimeout, PWError):
pass
if not totp_done and (
"authenticate" in cur
or "otp" in cur
or page.locator("input#otp").count() > 0
):
if _fill_totp(page, totp_code):
totp_done = True
_log(" otp submitted, wait for redirect to ViewKlassen…")
try:
page.wait_for_url("**ViewKlassen**", timeout=15_000)
except (PWTimeout, PWError):
_log(f" wait_for_url failed, url={page.url[:120]}")
page.wait_for_timeout(800)
stuck_counter += 1
# Sortie anticipée si totp validé mais redirect ne vient pas
# (probablement code OTP invalide ou expiré)
if totp_done and stuck_counter > 15 and "viewklassen" not in cur:
_log(f" TOTP submitted mais pas de redirect → code peut-être invalide")
break
else:
# Diagnostic supplémentaire
_log(f"TIMEOUT url={page.url[:120]} login_done={login_done} totp_done={totp_done}")
try:
# Pages d'erreur Keycloak fréquentes
body_txt = page.evaluate("() => (document.body && document.body.innerText || '').slice(0, 500)")
_log(f" body_preview: {body_txt!r}")
except Exception:
pass
raise RuntimeError(
f"Timeout login (90s) — login_done={login_done} totp_done={totp_done} url={page.url[:80]}"
)
# Force le français + scrape
_ensure_french_language(page)
page.goto(CLASSES_URL, wait_until="domcontentloaded", timeout=15_000)
try:
page.wait_for_load_state("networkidle", timeout=10_000)
except (PWTimeout, PWError):
pass
classes = _scrape_classes(page)
# Filtre : exclure les classes MP* (Matu Pro), MI* (Maîtrise),
# "Formation*" (modules de formation continue, hors flux régulier).
filtered = [
c for c in classes
if not (
c.startswith("MP")
or c.startswith("MI")
or c.lower().startswith("formation")
)
]
removed = sorted(set(classes) - set(filtered))
_log(f"DONE {len(filtered)} classes (filtré {len(removed)} : {removed})")
return {
"ok": True,
"classes": filtered,
"duration_s": round(time.time() - t_start, 1),
}
finally:
try: ctx.close()
except Exception: pass
finally:
try: pw.stop()
except Exception: pass
# cleanup du profile temporaire
import shutil
shutil.rmtree(profile_dir, ignore_errors=True)
def main():
global _USERNAME
if len(sys.argv) < 2:
print("Usage: fetch_user_classes.py <username>", file=sys.stderr)
sys.exit(2)
username = sys.argv[1].strip()
_USERNAME = username
totp_code = (os.getenv("TOTP_CODE") or "").strip()
if not totp_code or not totp_code.isdigit() or len(totp_code) != 6:
result = {"ok": False, "error": "TOTP_CODE manquant ou invalide (6 chiffres requis)", "classes": []}
else:
try:
result = fetch_classes(username, totp_code)
except Exception as e:
result = {"ok": False, "error": str(e), "classes": []}
out_file = DATA_DIR / f"sync_user_classes_{username}.json"
out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False))
sys.exit(0 if result.get("ok") else 1)
if __name__ == "__main__":
main()