426 lines
15 KiB
Python
Executable file
426 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Pull des notices depuis Escadaweb pour les apprentis des classes données.
|
|
|
|
Usage : python pull_notices.py CLASSE1 CLASSE2 ...
|
|
|
|
Pour chaque classe :
|
|
1. Navigue vers la liste Élèves (ViewLernende)
|
|
2. Pour chaque apprenti de la classe :
|
|
- Clic "Notices" dans sa ligne
|
|
- Scrape la grille (pagination gérée)
|
|
- Wipe + insert les notices dans ApprentiNotice
|
|
- Retour à la liste Élèves
|
|
3. Passe à la classe suivante
|
|
|
|
Sortie standard (parsable) :
|
|
PULL_NOTICES_DONE {"ok": N_apprentis_ok, "imported": N_notices, "err": [...]}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from datetime import date, datetime
|
|
from pathlib import Path
|
|
|
|
_ROOT = Path(__file__).resolve().parent.parent
|
|
if str(_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(_ROOT))
|
|
|
|
from sqlalchemy import select, delete # noqa: E402
|
|
from playwright.sync_api import Page, TimeoutError as PWTimeout # noqa: E402
|
|
|
|
from src.db import get_session, Apprenti, ApprentiNotice # noqa: E402
|
|
from src.logger import app_log # noqa: E402
|
|
|
|
from scripts.sync_esacada import ( # noqa: E402
|
|
_launch_context, _ensure_logged_in, _go_to_students_page, _log,
|
|
CLASSES_URL,
|
|
)
|
|
|
|
|
|
_DATE_RE = re.compile(r"(\d{2})\.(\d{2})\.(\d{4})")
|
|
|
|
|
|
def _parse_date(s: str) -> date | None:
|
|
if not s:
|
|
return None
|
|
m = _DATE_RE.search(s)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _scrape_notices_grid(page: Page) -> list[dict]:
|
|
"""Scrape toutes les pages de la grille des notices.
|
|
|
|
Ordre des colonnes attendu (basé sur la structure DevExpress observée) :
|
|
0 Editer (icône) | 1 Date | 2 Type | 3 Auteur | 4 Titre | 5 Remarques
|
|
6 Visible classe (checkbox) | 7 Matière | 8-11 visibilités (ignorées)
|
|
"""
|
|
notices: list[dict] = []
|
|
seen_rows: set[str] = set() # éviter de re-scraper après navigation pagination
|
|
|
|
current_pg = 1
|
|
while True:
|
|
try:
|
|
page.wait_for_selector(
|
|
"table[id$='gridNotizen_DXMainTable']",
|
|
state="attached", timeout=10_000,
|
|
)
|
|
except PWTimeout:
|
|
_log(f" [notices p={current_pg}] grille non chargée")
|
|
break
|
|
|
|
# Récupérer toutes les lignes de données via JS pour fiabilité
|
|
rows_data = page.evaluate("""() => {
|
|
const out = [];
|
|
const rows = document.querySelectorAll("tr[id*='gridNotizen_DXDataRow']");
|
|
for (const tr of rows) {
|
|
const cells = tr.querySelectorAll(":scope > td");
|
|
const texts = Array.from(cells).map(td => (td.innerText || td.textContent || '').trim());
|
|
// Détection checkbox "Visible classe" : présence d'une image cochée
|
|
const cb = cells[6] ? cells[6].querySelector("img") : null;
|
|
const vis = cb ? !(cb.src || '').toLowerCase().includes('unchecked') : null;
|
|
out.push({texts, visible: vis, id: tr.id});
|
|
}
|
|
return out;
|
|
}""")
|
|
|
|
added = 0
|
|
for row in rows_data:
|
|
if row["id"] in seen_rows:
|
|
continue
|
|
seen_rows.add(row["id"])
|
|
t = row["texts"]
|
|
# Index défensif (cas où le DOM diffère légèrement)
|
|
def col(i: int) -> str:
|
|
return t[i] if i < len(t) else ""
|
|
notices.append({
|
|
"date": _parse_date(col(1)),
|
|
"type": col(2) or None,
|
|
"auteur": col(3) or None,
|
|
"titre": col(4) or None,
|
|
"remarque": col(5) or None,
|
|
"matiere": col(7) or None,
|
|
"visible_classe": row.get("visible"),
|
|
})
|
|
added += 1
|
|
_log(f" [notices p={current_pg}] +{added} ligne(s)")
|
|
|
|
# Pagination : aller à la page suivante si dispo
|
|
try:
|
|
next_link = page.locator(
|
|
f"a.dxp-num:has-text('{current_pg + 1}')"
|
|
).first
|
|
if next_link.count() == 0:
|
|
break
|
|
next_link.click()
|
|
page.wait_for_load_state("networkidle", timeout=10_000)
|
|
page.wait_for_timeout(400)
|
|
current_pg += 1
|
|
except Exception:
|
|
break
|
|
|
|
return notices
|
|
|
|
|
|
def _student_rows(page: Page) -> list[dict]:
|
|
"""Liste des lignes Élèves avec nom, prénom, et drapeau "a des notices".
|
|
|
|
Structure de la grille Lernende (cellules) :
|
|
[0] Detail expand
|
|
[1] Notes link icon
|
|
[2] Edit button
|
|
[3] **Nom**
|
|
[4] **Prénom**
|
|
[5] Entreprise
|
|
[6] MP / [7] Disp. CG / [8] Abs. excu / [9] Abs. non excu / [10] Remarque
|
|
[11] Compensation / [12] Documents
|
|
[13] **Notices link** (icône : note_pinned = vide, note_text = avec)
|
|
[14] History / [15] Tasks
|
|
|
|
Format : [{row_id, nom, prenom, has_notices, notices_href}].
|
|
Gère la pagination.
|
|
"""
|
|
out: list[dict] = []
|
|
seen: set[str] = set()
|
|
current_pg = 1
|
|
while True:
|
|
rows = page.evaluate("""() => {
|
|
const out = [];
|
|
const trs = document.querySelectorAll("tr[id*='GridLernende_DXDataRow']");
|
|
for (const tr of trs) {
|
|
const cells = tr.querySelectorAll(":scope > td");
|
|
const txt = (i) => cells[i] ? (cells[i].innerText || cells[i].textContent || '').trim() : '';
|
|
const nom = txt(3);
|
|
const prenom = txt(4);
|
|
// Lien Notices = cellule 13 (peut varier si Escada change l'ordre)
|
|
let hasNotices = false;
|
|
let noticesHref = null;
|
|
// Cherche dans toute la ligne le lien Notices via son title
|
|
const noticeLink = tr.querySelector("a[title='Notices']");
|
|
if (noticeLink) {
|
|
noticesHref = noticeLink.getAttribute('href');
|
|
const img = noticeLink.querySelector('img');
|
|
if (img) {
|
|
const src = (img.getAttribute('src') || '').toLowerCase();
|
|
hasNotices = src.includes('note_text');
|
|
}
|
|
}
|
|
out.push({
|
|
id: tr.id,
|
|
nom: nom,
|
|
prenom: prenom,
|
|
has_notices: hasNotices,
|
|
notices_href: noticesHref,
|
|
});
|
|
}
|
|
return out;
|
|
}""")
|
|
added = 0
|
|
for r in rows:
|
|
if r["id"] in seen:
|
|
continue
|
|
seen.add(r["id"])
|
|
out.append({
|
|
"row_id": r["id"],
|
|
"nom": r["nom"],
|
|
"prenom": r["prenom"],
|
|
"has_notices": r["has_notices"],
|
|
"notices_href": r["notices_href"],
|
|
})
|
|
added += 1
|
|
_log(f" [élèves p={current_pg}] +{added}")
|
|
# Page suivante ?
|
|
try:
|
|
next_link = page.locator(
|
|
f"a.dxp-num:has-text('{current_pg + 1}')"
|
|
).first
|
|
if next_link.count() == 0:
|
|
break
|
|
next_link.click()
|
|
page.wait_for_load_state("networkidle", timeout=10_000)
|
|
page.wait_for_timeout(400)
|
|
current_pg += 1
|
|
except Exception:
|
|
break
|
|
return out
|
|
|
|
|
|
def _pull_one_row(
|
|
page: Page, sess, row: dict, classe: str, students_url: str,
|
|
db_apprentis: list,
|
|
) -> tuple[int, str | None, Apprenti | None]:
|
|
"""Pour une ligne Élève avec notices, scrape la grille et insert en DB.
|
|
|
|
`row` est le dict produit par `_student_rows` : {row_id, nom, prenom, has_notices, notices_href}
|
|
|
|
Retourne (nb_importées, err, apprenti_match).
|
|
"""
|
|
nom = (row.get("nom") or "").strip()
|
|
prenom = (row.get("prenom") or "").strip()
|
|
|
|
# 1. Recherche match dans la liste DB de la classe (avant navigation).
|
|
# Plusieurs stratégies en cascade pour tolérer les différences de
|
|
# découpage nom/prénom (ex: "Loureiro" + "de Menezes Tiago" en DB vs
|
|
# "Loureiro de Menezes" + "Tiago" sur Escada).
|
|
import unicodedata
|
|
def _norm(s: str) -> str:
|
|
nfkd = unicodedata.normalize("NFKD", s or "")
|
|
return " ".join(
|
|
nfkd.encode("ascii", "ignore").decode("ascii").lower().split()
|
|
)
|
|
|
|
full_escada = _norm(f"{nom} {prenom}")
|
|
|
|
match: Apprenti | None = None
|
|
# Stratégie A : match nom strict + premier mot du prénom
|
|
for a in db_apprentis:
|
|
db_nom = (a.nom or "").strip()
|
|
db_pre1 = (a.prenom or "").strip().split(maxsplit=1)[0] if a.prenom else ""
|
|
if db_nom == nom and prenom and (
|
|
prenom.startswith(db_pre1) or db_pre1.startswith(prenom.split(maxsplit=1)[0])
|
|
):
|
|
match = a
|
|
break
|
|
# Stratégie B : match nom strict seul
|
|
if not match:
|
|
for a in db_apprentis:
|
|
if (a.nom or "").strip() == nom:
|
|
match = a
|
|
break
|
|
# Stratégie C : match par nom complet normalisé (sans accents, casse insensible)
|
|
if not match and full_escada:
|
|
for a in db_apprentis:
|
|
full_db = _norm(f"{a.nom} {a.prenom}")
|
|
if full_db == full_escada:
|
|
match = a
|
|
break
|
|
|
|
if not match:
|
|
return 0, f"apprenti '{nom} {prenom}' non trouvé en DB pour {classe}", None
|
|
|
|
# 2. Navigation vers la page Notices : on utilise href si dispo (plus rapide),
|
|
# sinon clic sur le lien Notices de la ligne.
|
|
href = row.get("notices_href")
|
|
try:
|
|
if href:
|
|
# href peut être relatif (ex: "ViewNotizen.aspx?id=...") — on résout via JS
|
|
target_url = page.evaluate(
|
|
"(h) => new URL(h, document.baseURI).href", href
|
|
)
|
|
page.goto(target_url)
|
|
else:
|
|
page.locator(f"#{row['row_id']}").get_by_role("link", name="Notices").first.click()
|
|
page.wait_for_load_state("networkidle", timeout=15_000)
|
|
except Exception as e:
|
|
return 0, f"navigation Notices : {e}", match
|
|
|
|
# 3. Scrape grille
|
|
try:
|
|
notices = _scrape_notices_grid(page)
|
|
except Exception as e:
|
|
try:
|
|
page.goto(students_url)
|
|
page.wait_for_load_state("networkidle", timeout=10_000)
|
|
except Exception:
|
|
pass
|
|
return 0, f"scrape grille : {e}", match
|
|
|
|
# 4. Insert (le wipe global a déjà été fait au début de la classe)
|
|
try:
|
|
for n in notices:
|
|
if not n["date"]:
|
|
continue
|
|
sess.add(ApprentiNotice(
|
|
apprenti_id = match.id,
|
|
date_event = n["date"],
|
|
type_notice = n.get("type"),
|
|
auteur = n.get("auteur"),
|
|
titre = n.get("titre"),
|
|
remarque = n.get("remarque"),
|
|
matiere = n.get("matiere"),
|
|
visible_classe = n.get("visible_classe"),
|
|
imported_at = datetime.now(),
|
|
))
|
|
sess.commit()
|
|
except Exception as e:
|
|
sess.rollback()
|
|
return 0, f"DB insert : {e}", match
|
|
|
|
# 5. Retour à la liste élèves
|
|
try:
|
|
page.goto(students_url)
|
|
page.wait_for_load_state("networkidle", timeout=15_000)
|
|
except Exception:
|
|
pass
|
|
|
|
return len(notices), None, match
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage : pull_notices.py CLASSE1 [CLASSE2 ...]", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
target_classes = [c for c in sys.argv[1:] if c.strip()]
|
|
sess = get_session()
|
|
ok_count = 0
|
|
total_imported = 0
|
|
errors: list[str] = []
|
|
|
|
try:
|
|
app_log(f"[pull_notices] démarrage — {len(target_classes)} classe(s)")
|
|
pw, ctx, page = _launch_context()
|
|
try:
|
|
page.goto(CLASSES_URL)
|
|
_ensure_logged_in(page)
|
|
|
|
for classe in target_classes:
|
|
_log(f"[pull_notices] classe={classe}")
|
|
|
|
# 1. Wipe global des notices existantes pour les apprentis de cette classe
|
|
db_apprentis = sess.execute(
|
|
select(Apprenti).where(Apprenti.classe == classe)
|
|
).scalars().all()
|
|
if db_apprentis:
|
|
appr_ids = [a.id for a in db_apprentis]
|
|
sess.execute(
|
|
delete(ApprentiNotice).where(ApprentiNotice.apprenti_id.in_(appr_ids))
|
|
)
|
|
sess.commit()
|
|
_log(f" [{classe}] wipe ApprentiNotice : {len(appr_ids)} apprenti(s)")
|
|
|
|
# 2. Navigue vers la liste Élèves
|
|
try:
|
|
students_page = _go_to_students_page(page, classe)
|
|
except Exception as e:
|
|
students_page = None
|
|
_log(f" ERR navigation : {e}")
|
|
if not students_page:
|
|
errors.append(f"classe '{classe}' : page Élèves introuvable")
|
|
continue
|
|
students_url = students_page.url
|
|
|
|
# 3. Liste des lignes (avec drapeau has_notices)
|
|
try:
|
|
rows = _student_rows(students_page)
|
|
except Exception as e:
|
|
errors.append(f"classe '{classe}' : scrape liste élèves : {e}")
|
|
continue
|
|
nb_with = sum(1 for r in rows if r["has_notices"])
|
|
_log(f" [{classe}] {len(rows)} élève(s), {nb_with} avec notice(s)")
|
|
|
|
# 4. Pour chaque ligne ayant des notices : pull
|
|
for r in rows:
|
|
label = f"{r.get('nom','?')} {r.get('prenom','?')}"
|
|
if not r["has_notices"]:
|
|
continue
|
|
try:
|
|
n, err, match = _pull_one_row(
|
|
students_page, sess, r, classe, students_url, db_apprentis,
|
|
)
|
|
if err:
|
|
errors.append(f"{label} ({classe}) : {err}")
|
|
try:
|
|
students_page.goto(students_url)
|
|
students_page.wait_for_load_state("networkidle", timeout=10_000)
|
|
except Exception:
|
|
break
|
|
else:
|
|
ok_count += 1
|
|
total_imported += n
|
|
_log(f" OK {label} : {n} notice(s)")
|
|
except Exception as e:
|
|
errors.append(f"{label} ({classe}) : {e}")
|
|
_log(f" EX {label} : {e}\n{traceback.format_exc()}")
|
|
finally:
|
|
try: ctx.close()
|
|
except Exception: pass
|
|
try: pw.stop()
|
|
except Exception: pass
|
|
finally:
|
|
sess.close()
|
|
print(
|
|
'PULL_NOTICES_DONE '
|
|
+ json.dumps({
|
|
"ok": ok_count,
|
|
"imported": total_imported,
|
|
"err": errors,
|
|
}, ensure_ascii=False),
|
|
flush=True,
|
|
)
|
|
app_log(
|
|
f"[pull_notices] terminé — {ok_count} apprenti(s) OK, "
|
|
f"{total_imported} notice(s) importée(s), {len(errors)} erreur(s)"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|