retenue: avis PDF + notices Escada + mapping profession

- nouvelle page /retenue : sélection apprenti, date retenue, date du
  problème, motif (3 cases mutex), branche (autocomplete + saisie libre
  depuis NotesExamen), remarque. Génération PDF basée sur le template
  AcroForm officiel, séparation des 3 widgets Date partagés en 3 champs
  distincts pour ne remplir que celui de la case cochée. Téléchargement
  ou envoi par email (3 destinataires).
- profession : nouveau champ ApprentiFiche.profession, dérivé du préfixe
  de classe via mapping configurable dans Paramètres
  ("AUTOMAT" → "Automaticien CFC" par défaut). Section dédiée avec
  classes orphelines détectées automatiquement.
- notices Escada : nouvelle table Notice (apprenti, titre, remarque,
  date, status). Checkbox "Ajouter automatiquement une notice sur
  Escada" sur /retenue qui crée une entrée pending. Bloc dédié sur
  /escada listant les pending, bouton "Pousser les notices" qui lance
  scripts/push_notices.py (Playwright : navigation Classes → Élèves →
  Notices → Ajouter, fill date / titre / remarque, vérification post-save,
  suppression DB si OK, marquage failed sinon). Nouveau task_kind "push_notices"
  dans le cron pour exécution planifiée.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Julien Balet 2026-05-11 11:24:15 +02:00
parent 6a69f36e83
commit 6d1b7c8044
13 changed files with 1876 additions and 5 deletions

Binary file not shown.

View file

@ -13,6 +13,7 @@ from .pages.purge import purge_page, PurgeState
from .pages.doc import doc_page, DocState
from .pages.profile import profile_page, ProfileState
from .pages.password_set import password_set_page, PasswordSetState
from .pages.retenue import retenue_page, RetenueState
TITLE = "EPTM Dashboard"
@ -58,5 +59,6 @@ app.add_page(params_page, route="/params", on_load=[AuthState.check_auth,
app.add_page(purge_page, route="/purge", on_load=[AuthState.check_auth, PurgeState.load_data], title=TITLE)
app.add_page(doc_page, route="/doc", on_load=[AuthState.check_auth, DocState.load_data], title=TITLE)
app.add_page(profile_page, route="/profile", on_load=[AuthState.check_auth, ProfileState.load_data], title=TITLE)
app.add_page(retenue_page, route="/retenue", on_load=[AuthState.check_auth, RetenueState.load_data], title=TITLE)
# Page publique (pas de check_auth — accessible via lien email)
app.add_page(password_set_page, route="/password-set", on_load=PasswordSetState.load_data, title=TITLE)

View file

@ -83,7 +83,8 @@ class CronState(AuthState):
"schedule_desc": desc,
"task_kind": job.task_kind,
"task_label": {"push": "Push", "sync": "Sync",
"push_then_sync": "Push + Sync"}.get(job.task_kind, job.task_kind),
"push_then_sync": "Push + Sync",
"push_notices": "Push notices"}.get(job.task_kind, job.task_kind),
"last_run_at": job.last_run_at.strftime("%d.%m.%Y %H:%M") if job.last_run_at else "",
"last_status": job.last_status,
"last_message": job.last_message[:120] if job.last_message else "",
@ -648,13 +649,13 @@ def _form_task_picker() -> rx.Component:
return rx.vstack(
rx.text("Tâche", size="2", font_weight="600"),
rx.radio(
["push", "sync", "push_then_sync"],
["push", "sync", "push_then_sync", "push_notices"],
value=CronState.f_task_kind,
on_change=CronState.set_f_task_kind,
direction="column",
),
rx.cond(
CronState.f_task_kind != "push",
(CronState.f_task_kind != "push") & (CronState.f_task_kind != "push_notices"),
rx.vstack(
rx.text("Données à synchroniser", size="2", font_weight="600",
margin_top="0.5rem"),

View file

@ -21,7 +21,7 @@ def _background(fn):
from ..state import AuthState
from ..sidebar import layout
from src.db import get_session, Apprenti, EscadaPending
from src.db import get_session, Apprenti, EscadaPending, Notice
from src.logger import app_log
_RE_SYNC_PROD = re.compile(r"^\[\d{2}:\d{2}:\d{2}\] ([^ ].*)$")
@ -45,6 +45,7 @@ DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data")))
CLASSES_CACHE = DATA_DIR / "esacada_classes.json"
_SYNC_SCRIPT = _ROOT / "scripts" / "sync_esacada.py"
_PUSH_SCRIPT = _ROOT / "scripts" / "push_to_escada.py"
_PUSH_NOTICES_SCRIPT = _ROOT / "scripts" / "push_notices.py"
_SYNC_RESULT_FILE = DATA_DIR / "sync_last_result.json"
_SYNC_ALL_DONE_FILE = DATA_DIR / "sync_all_done.json"
@ -77,11 +78,19 @@ class EscadaState(AuthState):
pending_count: int = 0
pending_data: list[dict] = []
notices_count: int = 0
notices_data: list[dict] = []
push_done: bool = False
push_ok: int = 0
push_errors: list[str] = []
# Push notices
is_pushing_notices: bool = False
notices_push_ok: int = 0
notices_push_done: bool = False
notices_push_errors: list[str] = []
@rx.var
def selected_count(self) -> int:
return sum(1 for v in self.class_checked.values() if v)
@ -221,9 +230,32 @@ class EscadaState(AuthState):
}
for ep in pending
]
self._reload_notices(sess)
finally:
sess.close()
def _reload_notices(self, sess):
notices = sess.execute(
select(Notice)
.options(joinedload(Notice.apprenti))
.join(Apprenti, Notice.apprenti_id == Apprenti.id)
.where(Notice.status == "pending")
.order_by(Apprenti.classe, Notice.date_event, Apprenti.nom)
).scalars().all()
self.notices_count = len(notices)
self.notices_data = [
{
"id": n.id,
"classe": n.apprenti.classe,
"nom": n.apprenti.nom,
"prenom": n.apprenti.prenom,
"date": n.date_event.strftime("%d.%m.%Y"),
"titre": (n.titre or "")[:80] + ("" if len(n.titre or "") > 80 else ""),
"source": n.source,
}
for n in notices
]
# ── Background: refresh classes ────────────────────────────────────────────
@_background
@ -767,6 +799,121 @@ class EscadaState(AuthState):
except Exception:
pass
# ── Background: push notices vers Escada ──────────────────────────────────
@_background
async def push_notices(self):
async with self:
user = self.username or "?"
self.is_pushing_notices = True
self.notices_push_done = False
self.notices_push_ok = 0
self.notices_push_errors = []
app_log(f"Push notices Escada démarré par {user}")
cmd = [sys.executable, str(_PUSH_NOTICES_SCRIPT)]
lines: list[str] = []
_rc_holder = [0]
def _run() -> None:
_fd, _tmp = tempfile.mkstemp(suffix="_push_notices.log")
os.close(_fd)
try:
with open(_tmp, "wb") as _fout:
_proc = subprocess.Popen(
cmd, stdout=_fout, stderr=subprocess.STDOUT,
env={**os.environ, "PYTHONUNBUFFERED": "1"},
start_new_session=True,
)
_offset, _buf = 0, b""
while True:
_time.sleep(0.5)
try:
with open(_tmp, "rb") as _fin:
_fin.seek(_offset); _chunk = _fin.read(65536)
except Exception:
_chunk = b""
if _chunk:
_buf += _chunk; _offset += len(_chunk)
while b"\n" in _buf:
_raw, _buf = _buf.split(b"\n", 1)
_ln = _raw.decode("utf-8", errors="replace").rstrip()
if _ln:
lines.append(_ln); _log_sync_line(_ln, prefix="push_notices")
if _proc.poll() is not None:
_rc_holder[0] = _proc.wait() or 0
break
except Exception as _exc:
app_log(f"Erreur push notices subprocess : {_exc}")
finally:
try: os.unlink(_tmp)
except Exception: pass
_pool = _cf.ThreadPoolExecutor(max_workers=1)
_fut = _pool.submit(_run)
try:
while not _fut.done():
try:
await asyncio.sleep(1.0)
except asyncio.CancelledError:
_t = asyncio.current_task()
if _t is not None:
for _ in range(_t.cancelling()):
_t.uncancel()
try:
_fut.result()
except Exception as _te:
app_log(f"[push_notices] thread exception : {_te}")
finally:
_pool.shutdown(wait=False)
_rc = _rc_holder[0]
nb_ok = 0
errors: list[str] = []
done = False
for line in lines:
if "PUSH_NOTICES_DONE " in line:
done = True
try:
p = json.loads(line[line.index("PUSH_NOTICES_DONE ") + len("PUSH_NOTICES_DONE "):])
nb_ok = p.get("ok", 0)
errors = p.get("err", [])
except Exception as _e:
app_log(f" Erreur parse PUSH_NOTICES_DONE : {_e}", debug=True)
if done:
app_log(f"Push notices terminé — ok:{nb_ok} erreurs:{len(errors)}")
else:
app_log(f"Push notices : PUSH_NOTICES_DONE non trouvé (code={_rc})")
try:
_t = asyncio.current_task()
if _t is not None:
for _ in range(_t.cancelling()):
_t.uncancel()
async with self:
self.notices_push_done = done
self.notices_push_ok = nb_ok
self.notices_push_errors = errors
self.is_pushing_notices = False
self._reload_pending()
if done:
if errors:
yield rx.toast.warning(
f"Push notices : {nb_ok} OK, {len(errors)} erreur(s)"
)
else:
yield rx.toast.success(f"Push notices terminé — {nb_ok} envoyée(s)")
else:
yield rx.toast.error("Push notices échoué — vérifiez les logs")
except Exception as _e:
app_log(f"Erreur mise à jour état push notices : {_e}")
try:
async with self:
self.is_pushing_notices = False
except Exception:
pass
# ── UI helpers ────────────────────────────────────────────────────────────────
@ -932,6 +1079,18 @@ def _pending_row(item) -> rx.Component:
)
def _notice_row(item) -> rx.Component:
return rx.table.row(
rx.table.cell(item["classe"]),
rx.table.cell(rx.text(item["nom"], " ", item["prenom"])),
rx.table.cell(item["date"]),
rx.table.cell(rx.text(item["titre"], size="1")),
rx.table.cell(
rx.badge(item["source"], color_scheme="blue", variant="soft", size="1"),
),
)
def _sync_progress() -> rx.Component:
"""Indicateurs de progression — remplace l'ancien op_log dans la section sync."""
return rx.vstack(
@ -1361,6 +1520,103 @@ def escada_page() -> rx.Component:
width="100%",
),
# ── Section notices ───────────────────────────────────────────────
rx.box(
rx.text(
"Notices en attente",
size="3", font_weight="700", color="#37474f",
margin_bottom="0.75rem",
),
rx.cond(
EscadaState.notices_count == 0,
rx.text("Aucune notice en attente.", size="2", color="#666"),
rx.vstack(
rx.text(
EscadaState.notices_count,
" notice(s) en attente d'envoi vers Escada.",
size="2", color="#e65100", font_weight="600",
),
rx.box(
rx.table.root(
rx.table.header(
rx.table.row(
rx.table.column_header_cell("Classe"),
rx.table.column_header_cell("Apprenti"),
rx.table.column_header_cell("Date"),
rx.table.column_header_cell("Titre"),
rx.table.column_header_cell("Source"),
)
),
rx.table.body(
rx.foreach(EscadaState.notices_data, _notice_row),
),
width="100%",
size="1",
),
overflow_x="auto",
width="100%",
),
spacing="2",
width="100%",
margin_bottom="0.75rem",
),
),
rx.flex(
rx.button(
rx.cond(
EscadaState.is_pushing_notices,
rx.spinner(size="2"),
rx.icon("send", size=14),
),
rx.cond(
EscadaState.is_pushing_notices,
rx.text("Envoi en cours..."),
rx.text("Pousser les notices"),
),
on_click=EscadaState.push_notices,
disabled=(
EscadaState.is_pushing_notices
| (EscadaState.notices_count == 0)
),
color_scheme="blue",
size="2",
),
gap="1rem", align="center", flex_wrap="wrap",
margin_top="0.75rem",
),
rx.cond(
EscadaState.notices_push_done,
rx.vstack(
rx.cond(
EscadaState.notices_push_ok > 0,
rx.text(
EscadaState.notices_push_ok,
" notice(s) envoyée(s).",
size="2", color="#2e7d32", font_weight="600",
),
),
rx.cond(
EscadaState.notices_push_errors.length() > 0,
rx.vstack(
rx.foreach(
EscadaState.notices_push_errors,
lambda e: rx.text("", e, size="2", color="#c62828"),
),
spacing="1",
),
),
spacing="2",
margin_top="0.75rem",
width="100%",
),
),
padding="1.25rem",
background_color="white",
border_radius="8px",
border="1px solid #e0e0e0",
width="100%",
),
spacing="4",
width="100%",
)

View file

@ -1,9 +1,17 @@
import json
import os
import sys
from pathlib import Path
import reflex as rx
_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from src.profession import load_mapping, save_mapping, find_unmapped_classes, refresh_all_professions # noqa: E402
from src.db import get_session # noqa: E402
from ..sidebar import layout
from ..state import AuthState
@ -67,6 +75,14 @@ class ParamsState(AuthState):
app_base_url: str = ""
save_ok_app: bool = False
# ── Profession mapping ────────────────────────────────────────────────────
prof_mapping: list[dict] = []
prof_unmapped: list[str] = []
new_prefix: str = ""
new_profession: str = ""
save_ok_prof: bool = False
refresh_msg: str = ""
# ── Setters ───────────────────────────────────────────────────────────────
def set_texte_sanction(self, v: str): self.texte_sanction = v
def set_chef_section(self, v: str): self.chef_section = v
@ -81,6 +97,8 @@ class ParamsState(AuthState):
def set_email_subject(self, v: str): self.email_subject = v
def set_email_body(self, v: str): self.email_body = v
def set_app_base_url(self, v: str): self.app_base_url = v
def set_new_prefix(self, v: str): self.new_prefix = v
def set_new_profession(self, v: str): self.new_profession = v
def load_data(self):
if not self.authenticated:
@ -104,6 +122,17 @@ class ParamsState(AuthState):
self.save_ok_escada = False
self.save_ok_template = False
self.save_ok_app = False
self._reload_prof_mapping()
def _reload_prof_mapping(self):
self.prof_mapping = load_mapping()
sess = get_session()
try:
self.prof_unmapped = find_unmapped_classes(sess)
finally:
sess.close()
self.save_ok_prof = False
self.refresh_msg = ""
def save_sanctions(self):
s = _read_settings()
@ -164,6 +193,46 @@ class ParamsState(AuthState):
self.save_ok_escada = False
self.save_ok_template = False
# ── Profession mapping ───────────────────────────────────────────────────
def add_mapping(self):
prefix = self.new_prefix.strip()
prof = self.new_profession.strip()
if not prefix or not prof:
return
cur = list(self.prof_mapping)
# Si le préfixe existe déjà, on met juste à jour la profession
for m in cur:
if m.get("prefix") == prefix:
m["profession"] = prof
break
else:
cur.append({"prefix": prefix, "profession": prof})
save_mapping(cur)
self.new_prefix = ""
self.new_profession = ""
self._reload_prof_mapping()
self.save_ok_prof = True
def remove_mapping(self, prefix: str):
cur = [m for m in self.prof_mapping if m.get("prefix") != prefix]
save_mapping(cur)
self._reload_prof_mapping()
self.save_ok_prof = True
def quick_add_prefix(self, prefix: str):
"""Pré-remplit le formulaire avec une classe orpheline."""
self.new_prefix = prefix
self.new_profession = ""
def apply_mapping_to_db(self):
"""Recalcule la profession pour tous les apprentis avec le mapping actuel."""
sess = get_session()
try:
n = refresh_all_professions(sess)
finally:
sess.close()
self.refresh_msg = f"{n} fiche(s) mise(s) à jour."
# ── UI helpers ────────────────────────────────────────────────────────────────
@ -445,6 +514,127 @@ def _section_template() -> rx.Component:
)
def _mapping_row(m: rx.Var) -> rx.Component:
return rx.flex(
rx.box(
rx.text("Préfixe", size="1", color="var(--gray-10)"),
rx.text(m["prefix"], size="2", weight="medium"),
flex="1", min_width="120px",
),
rx.box(
rx.text("Profession", size="1", color="var(--gray-10)"),
rx.text(m["profession"], size="2"),
flex="2", min_width="200px",
),
rx.button(
rx.icon("trash-2", size=14),
on_click=ParamsState.remove_mapping(m["prefix"]),
color_scheme="red", variant="ghost", size="1",
),
gap="0.75rem", align="center", flex_wrap="wrap",
padding="0.4rem 0.6rem",
border="1px solid var(--gray-5)",
border_radius="6px",
background_color="white",
width="100%",
)
def _unmapped_chip(classe: rx.Var) -> rx.Component:
return rx.button(
rx.icon("plus", size=12),
classe,
on_click=ParamsState.quick_add_prefix(classe),
color_scheme="amber", variant="soft", size="1",
)
def _section_profession() -> rx.Component:
return _section(
"Correspondances classe → profession",
rx.text(
"Lors de l'import des données apprentis, la profession est dérivée "
"du préfixe de la classe (ex. classe « AUTOMAT 1 » → profession "
"« Automaticien CFC »). Utilisée notamment dans les avis de retenue.",
size="1", color="var(--gray-11)",
),
# Tableau des correspondances
rx.cond(
ParamsState.prof_mapping.length() > 0,
rx.vstack(
rx.foreach(ParamsState.prof_mapping, _mapping_row),
spacing="2", width="100%",
),
rx.text("Aucune correspondance configurée.", size="2", color="var(--gray-10)"),
),
# Classes orphelines
rx.cond(
ParamsState.prof_unmapped.length() > 0,
rx.box(
rx.text(
"Classes sans correspondance (clique pour ajouter) :",
size="2", weight="medium", color="#92400e", margin_bottom="0.4rem",
),
rx.flex(
rx.foreach(ParamsState.prof_unmapped, _unmapped_chip),
gap="0.35rem", flex_wrap="wrap",
),
padding="0.75rem",
background_color="#fef3c7",
border="1px solid #fcd34d",
border_radius="6px",
width="100%",
),
rx.fragment(),
),
# Ajout d'une nouvelle correspondance
rx.divider(),
rx.text("Ajouter / modifier une correspondance", size="2", weight="medium"),
rx.flex(
_field(
"Préfixe de classe",
rx.input(
value=ParamsState.new_prefix,
on_change=ParamsState.set_new_prefix,
placeholder="ex. AUTOMAT",
width="100%",
),
),
_field(
"Profession",
rx.input(
value=ParamsState.new_profession,
on_change=ParamsState.set_new_profession,
placeholder="ex. Automaticien CFC",
width="100%",
),
),
gap="0.75rem", flex_wrap="wrap", width="100%",
),
rx.flex(
rx.button(
rx.icon("plus", size=16),
"Ajouter / mettre à jour",
on_click=ParamsState.add_mapping,
color_scheme="blue", size="2",
),
rx.button(
rx.icon("refresh-cw", size=14),
"Appliquer aux fiches existantes",
on_click=ParamsState.apply_mapping_to_db,
color_scheme="gray", variant="soft", size="2",
),
_save_ok_callout(ParamsState.save_ok_prof),
rx.cond(
ParamsState.refresh_msg != "",
rx.text(ParamsState.refresh_msg, size="1", color="#15803d"),
rx.fragment(),
),
gap="0.5rem", align="center", flex_wrap="wrap",
),
)
def _section_app() -> rx.Component:
return _section(
"Application",
@ -482,6 +672,7 @@ def params_page() -> rx.Component:
rx.vstack(
rx.heading("Paramètres", size="7"),
_section_app(),
_section_profession(),
_section_sanction(),
_section_smtp(),
_section_escada(),

View file

@ -0,0 +1,778 @@
"""Page /retenue — génération et envoi d'avis de retenue."""
from __future__ import annotations
import json
import os
import sys
from datetime import date as _date
from pathlib import Path
from typing import Optional
import reflex as rx
from sqlalchemy import select
_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from src.db import get_session, Apprenti, ApprentiFiche, NotesExamen, Notice # noqa: E402
from src.user_access import get_allowed_classes, is_class_allowed # noqa: E402
from src.profession import resolve_profession # noqa: E402
from src.retenue_pdf import generate_retenue_pdf # noqa: E402
from src.email_sender import send_email # noqa: E402
from src.logger import app_log # noqa: E402
from ..state import AuthState
from ..sidebar import layout
from ..components import empty_state
DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data")))
_SETTINGS_FILE = DATA_DIR / "settings.json"
def _load_settings() -> dict:
if _SETTINGS_FILE.exists():
try:
return json.loads(_SETTINGS_FILE.read_text(encoding="utf-8"))
except Exception:
return {}
return {}
# ── State ─────────────────────────────────────────────────────────────────────
class RetenueState(AuthState):
# Sélecteur apprenti
apprenti_labels: list[str] = []
apprenti_ids: list[int] = []
selected_label: str = ""
selected_id: int = 0
has_apprentis: bool = False
apprenti_search: str = ""
apprenti_select_open: bool = False
# Données de l'apprenti sélectionné
sel_classe: str = ""
sel_profession: str = ""
sel_fiche_email_appr: str = ""
sel_fiche_email_form: str = ""
sel_fiche_email_entr: str = ""
sel_fiche_nom_entr: str = ""
# Cache des branches (récupérées des notes d'examen)
branches_cache: list[str] = []
branche_search: str = ""
branche_open: bool = False
# Formulaire
retenue_date: str = "" # ISO date "YYYY-MM-DD"
probleme_date: str = ""
case: str = "devoir" # "devoir" | "comportement" | "retard"
branche: str = ""
remarque: str = ""
# Email
email_dest: str = "apprenti"
email_custom: str = ""
# Option : créer une notice Escada à la génération
add_notice: bool = False
# États
form_error: str = ""
@rx.var
def filtered_apprenti_labels(self) -> list[str]:
q = self.apprenti_search.lower().strip()
if not q:
return self.apprenti_labels
return [l for l in self.apprenti_labels if q in l.lower()]
@rx.var
def filtered_branches(self) -> list[str]:
q = self.branche_search.lower().strip()
if not q:
return self.branches_cache
return [b for b in self.branches_cache if q in b.lower()]
# ── Setters ──────────────────────────────────────────────────────────────
def set_apprenti_search(self, v: str): self.apprenti_search = v
def set_apprenti_select_open(self, v: bool):
self.apprenti_select_open = v
if not v:
self.apprenti_search = ""
def set_branche_search(self, v: str): self.branche_search = v
def set_branche_open(self, v: bool):
self.branche_open = v
if not v:
self.branche_search = ""
def set_retenue_date(self, v: str): self.retenue_date = v
def set_probleme_date(self, v: str): self.probleme_date = v
def set_case(self, v: str): self.case = v
def set_branche(self, v: str): self.branche = v
def set_remarque(self, v: str): self.remarque = v
def set_profession(self, v: str): self.sel_profession = v
def set_email_dest(self, v: str): self.email_dest = v
def set_email_custom(self, v: str): self.email_custom = v
def set_add_notice(self, v: bool): self.add_notice = v
def load_data(self):
if not self.authenticated:
return rx.redirect("/login")
sess = get_session()
try:
allowed = get_allowed_classes(self.username)
q = select(Apprenti).order_by(Apprenti.nom, Apprenti.prenom)
if allowed is not None:
q = q.where(Apprenti.classe.in_(allowed))
apprentis = sess.execute(q).scalars().all()
if not apprentis:
self.has_apprentis = False
self.apprenti_labels = []
self.apprenti_ids = []
return
self.has_apprentis = True
self.apprenti_labels = [
f"{a.nom} {a.prenom} ({a.classe})" for a in apprentis
]
self.apprenti_ids = [a.id for a in apprentis]
# Toujours partir d'une sélection vide à l'arrivée sur la page
self.selected_id = 0
self.selected_label = ""
self.sel_classe = ""
self.sel_profession = ""
self.sel_fiche_email_appr = ""
self.sel_fiche_email_form = ""
self.sel_fiche_email_entr = ""
self.sel_fiche_nom_entr = ""
self._load_branches(sess)
finally:
sess.close()
# Dates par défaut = aujourd'hui
today = _date.today().isoformat()
if not self.retenue_date:
self.retenue_date = today
if not self.probleme_date:
self.probleme_date = today
def _load_apprenti(self):
if not self.selected_id:
return
sess = get_session()
try:
ap = sess.get(Apprenti, self.selected_id)
if not ap:
return
self.sel_classe = ap.classe
fiche = ap.fiche
if fiche:
self.sel_profession = fiche.profession or resolve_profession(ap.classe)
self.sel_fiche_email_appr = fiche.email or ""
self.sel_fiche_email_form = fiche.formateur_email or ""
self.sel_fiche_email_entr = fiche.entreprise_email or ""
self.sel_fiche_nom_entr = fiche.entreprise_nom or ""
else:
self.sel_profession = resolve_profession(ap.classe)
self.sel_fiche_email_appr = ""
self.sel_fiche_email_form = ""
self.sel_fiche_email_entr = ""
self.sel_fiche_nom_entr = ""
finally:
sess.close()
def _load_branches(self, sess):
"""Construit le cache des branches uniques depuis NotesExamen."""
rows = sess.execute(select(NotesExamen.donnees_json)).scalars().all()
seen: set[str] = set()
for raw in rows:
try:
d = json.loads(raw)
except Exception:
continue
if isinstance(d, list):
for br in d:
name = (br.get("branche") or "").strip()
if name:
seen.add(name)
self.branches_cache = sorted(seen)
def handle_select_apprenti(self, label: str):
self.selected_label = label
try:
idx = self.apprenti_labels.index(label)
self.selected_id = self.apprenti_ids[idx]
except ValueError:
pass
self.apprenti_select_open = False
self.apprenti_search = ""
self._load_apprenti()
def apprenti_search_keydown(self, key: str):
if key == "Enter":
results = self.filtered_apprenti_labels
if results:
return RetenueState.handle_select_apprenti(results[0])
elif key == "Escape":
self.apprenti_select_open = False
self.apprenti_search = ""
def select_branche(self, b: str):
self.branche = b
self.branche_open = False
self.branche_search = ""
def branche_keydown(self, key: str):
if key == "Enter":
# Si une seule branche filtrée : la sélectionne. Sinon prend la saisie libre.
results = self.filtered_branches
if len(results) == 1:
return RetenueState.select_branche(results[0])
elif self.branche_search:
self.branche = self.branche_search.strip()
self.branche_open = False
self.branche_search = ""
elif key == "Escape":
self.branche_open = False
self.branche_search = ""
# ── Actions ──────────────────────────────────────────────────────────────
_CASE_LABELS = {
"devoir": "N'a pas remis ses tâches scolaires dans les délais",
"comportement": "A manifesté un comportement répréhensible",
"retard": "Est arrivé en retard aux cours",
}
def _build_notice_titre(self) -> str:
label = self._CASE_LABELS.get(self.case, "")
if self.case == "devoir" and self.branche.strip():
return f"{label} en {self.branche.strip()}"
return label
def _create_notice_if_requested(self):
"""Crée une Notice en DB si la checkbox add_notice est cochée."""
if not self.add_notice or not self.selected_id:
return
sess = get_session()
try:
sess.add(Notice(
apprenti_id=self.selected_id,
date_event=_date.today(),
titre=self._build_notice_titre(),
remarque=(self.remarque or "").strip() or None,
type_notice=None,
matiere=None,
source="retenue",
status="pending",
created_by=self.username or None,
))
sess.commit()
app_log(
f"[notice] {self.username or '?'} : création (retenue) pour "
f"{self.selected_label} — case={self.case}"
)
except Exception as e:
sess.rollback()
app_log(f"[notice] échec création : {e}")
finally:
sess.close()
def _build_pdf(self) -> Optional[bytes]:
if not self.selected_id:
self.form_error = "Aucun apprenti sélectionné."
return None
if not is_class_allowed(self.username, self.sel_classe):
self.form_error = "Accès refusé pour cette classe."
return None
if self.case == "devoir" and not self.branche.strip():
self.form_error = "Veuillez préciser la branche."
return None
try:
r_date = _date.fromisoformat(self.retenue_date)
p_date = _date.fromisoformat(self.probleme_date)
except Exception:
self.form_error = "Date invalide."
return None
self.form_error = ""
sess = get_session()
try:
return generate_retenue_pdf(
sess, self.selected_id,
profession=self.sel_profession,
retenue_date=r_date,
probleme_date=p_date,
case=self.case,
branche=self.branche.strip(),
remarque=self.remarque,
prof_name=self.name or self.username,
)
finally:
sess.close()
def _filename(self) -> str:
sess = get_session()
try:
ap = sess.get(Apprenti, self.selected_id)
if not ap:
return "Avis_retenue.pdf"
safe_nom = "".join(c if c.isalnum() else "_" for c in ap.nom)
safe_prenom = "".join(c if c.isalnum() else "_" for c in ap.prenom)
return f"Avis_retenue_{safe_nom}_{safe_prenom}.pdf"
finally:
sess.close()
def download_pdf(self):
data = self._build_pdf()
if data is None:
if self.form_error:
return rx.toast.error(self.form_error)
return rx.toast.error("Impossible de générer le PDF.")
app_log(
f"[retenue] {self.username or '?'} : avis téléchargé pour "
f"{self.selected_label} (case={self.case})"
)
self._create_notice_if_requested()
return rx.download(data=data, filename=self._filename())
def send_email_action(self):
data = self._build_pdf()
if data is None:
if self.form_error:
return rx.toast.error(self.form_error)
return rx.toast.error("Impossible de générer le PDF.")
# Destinataire
if self.email_dest == "apprenti":
to = self.sel_fiche_email_appr
elif self.email_dest == "formateur":
to = self.sel_fiche_email_form
else:
to = self.email_custom.strip()
if not to or "@" not in to:
return rx.toast.error("Adresse email invalide ou manquante.")
s = _load_settings()
smtp_host = s.get("smtp_host")
smtp_port = int(s.get("smtp_port") or 587)
smtp_login = s.get("smtp_login")
smtp_password = s.get("smtp_password")
smtp_sender = s.get("smtp_sender")
if not (smtp_host and smtp_login and smtp_password and smtp_sender):
return rx.toast.error("Configuration SMTP incomplète (Paramètres).")
subject = f"Avis de retenue — {self.selected_label}"
body = (
f"Bonjour,\n\nVeuillez trouver en pièce jointe l'avis de retenue concernant "
f"{self.selected_label}.\n\nCordialement,\n{self.name or self.username}\n"
)
try:
send_email(
smtp_host=smtp_host, smtp_port=smtp_port,
smtp_login=smtp_login, smtp_password=smtp_password,
smtp_sender=smtp_sender,
to_email=to, subject=subject, body=body,
attachments=[(data, self._filename())],
)
except Exception as e:
return rx.toast.error(f"Échec d'envoi : {e}")
app_log(
f"[retenue] {self.username or '?'} : avis envoyé à {to} pour "
f"{self.selected_label}"
)
self._create_notice_if_requested()
return rx.toast.success(f"Avis envoyé à {to}")
# ── UI ────────────────────────────────────────────────────────────────────────
def _apprenti_option(label: rx.Var) -> rx.Component:
return rx.box(
rx.text(label, size="2"),
padding="0.45rem 0.75rem",
cursor="pointer",
on_click=RetenueState.handle_select_apprenti(label),
_hover={"background_color": "var(--gray-3)"},
width="100%",
)
def _apprenti_selector() -> rx.Component:
return rx.popover.root(
rx.popover.trigger(
rx.box(
rx.flex(
rx.cond(
RetenueState.selected_label != "",
rx.text(RetenueState.selected_label, size="2"),
rx.text("Sélectionner un apprenti…", size="2", color="var(--gray-9)"),
),
rx.spacer(),
rx.icon("chevron-down", size=18, color="var(--gray-9)"),
align="center",
width="100%",
),
padding="0.5rem 0.75rem",
border="1px solid var(--gray-7)",
border_radius="6px",
background_color="white",
cursor="pointer",
width="100%",
custom_attrs={"data-shortcut": "apprenti-search"},
),
),
rx.popover.content(
rx.vstack(
rx.input(
placeholder="Rechercher un apprenti…",
value=RetenueState.apprenti_search,
on_change=RetenueState.set_apprenti_search,
on_key_down=RetenueState.apprenti_search_keydown,
size="2",
width="100%",
auto_focus=True,
),
rx.cond(
RetenueState.filtered_apprenti_labels.length() > 0,
rx.box(
rx.foreach(RetenueState.filtered_apprenti_labels, _apprenti_option),
max_height="280px",
overflow_y="auto",
width="100%",
),
rx.box(
rx.text("Aucun résultat", size="2", color="var(--gray-9)"),
padding="0.5rem 0.75rem",
),
),
spacing="2",
width="100%",
),
min_width="320px",
max_width="500px",
padding="0.5rem",
),
open=RetenueState.apprenti_select_open,
on_open_change=RetenueState.set_apprenti_select_open,
)
def _branche_option(b: rx.Var) -> rx.Component:
return rx.box(
rx.text(b, size="2"),
padding="0.45rem 0.75rem",
cursor="pointer",
on_click=RetenueState.select_branche(b),
_hover={"background_color": "var(--gray-3)"},
width="100%",
)
def _branche_selector() -> rx.Component:
return rx.popover.root(
rx.popover.trigger(
rx.box(
rx.flex(
rx.cond(
RetenueState.branche != "",
rx.text(RetenueState.branche, size="2"),
rx.text("Choisir / taper une branche…", size="2", color="var(--gray-9)"),
),
rx.spacer(),
rx.icon("chevron-down", size=18, color="var(--gray-9)"),
align="center",
width="100%",
),
padding="0.5rem 0.75rem",
border="1px solid var(--gray-7)",
border_radius="6px",
background_color="white",
cursor="pointer",
width="100%",
),
),
rx.popover.content(
rx.vstack(
rx.input(
placeholder="Rechercher ou saisir une branche libre…",
value=RetenueState.branche_search,
on_change=RetenueState.set_branche_search,
on_key_down=RetenueState.branche_keydown,
size="2",
width="100%",
auto_focus=True,
),
rx.cond(
RetenueState.filtered_branches.length() > 0,
rx.box(
rx.foreach(RetenueState.filtered_branches, _branche_option),
max_height="280px",
overflow_y="auto",
width="100%",
),
rx.text(
"Appuyez sur Entrée pour valider votre saisie libre.",
size="1", color="var(--gray-9)",
padding="0.5rem 0.75rem",
),
),
spacing="2",
width="100%",
),
min_width="320px",
max_width="500px",
padding="0.5rem",
),
open=RetenueState.branche_open,
on_open_change=RetenueState.set_branche_open,
)
def _profession_warning() -> rx.Component:
# Affiché uniquement si un apprenti est sélectionné ET que sa profession est vide
return rx.cond(
(RetenueState.selected_id != 0) & (RetenueState.sel_profession == ""),
rx.callout.root(
rx.callout.icon(rx.icon("triangle-alert", size=16)),
rx.callout.text(
"Profession non définie pour ",
RetenueState.sel_classe,
". Renseigne-la ci-dessous, ou ajoute la correspondance dans ",
rx.link("Paramètres", href="/params", color="#1565c0"),
" pour qu'elle soit pré-remplie automatiquement.",
),
color_scheme="amber", variant="soft", size="1",
),
rx.fragment(),
)
def _form() -> rx.Component:
return rx.vstack(
# Apprenti
rx.vstack(
rx.text("Apprenti", size="2", weight="medium", color="var(--gray-11)"),
_apprenti_selector(),
spacing="1", width="100%",
),
_profession_warning(),
# Profession (éditable)
rx.vstack(
rx.text("Profession", size="2", weight="medium", color="var(--gray-11)"),
rx.input(
value=RetenueState.sel_profession,
on_change=RetenueState.set_profession,
placeholder="ex. Automaticien CFC",
width="100%",
),
spacing="1", width="100%",
),
# Dates
rx.flex(
rx.vstack(
rx.text("Date de retenue", size="2", weight="medium", color="var(--gray-11)"),
rx.input(
type="date",
value=RetenueState.retenue_date,
on_change=RetenueState.set_retenue_date,
width="100%",
),
spacing="1", flex="1", min_width="200px",
),
rx.vstack(
rx.text("Date du problème", size="2", weight="medium", color="var(--gray-11)"),
rx.input(
type="date",
value=RetenueState.probleme_date,
on_change=RetenueState.set_probleme_date,
width="100%",
),
spacing="1", flex="1", min_width="200px",
),
gap="0.75rem", flex_wrap="wrap", width="100%",
),
# Motif (radio)
rx.vstack(
rx.text("Motif de la retenue", size="2", weight="medium", color="var(--gray-11)"),
rx.radio_group.root(
rx.vstack(
rx.radio_group.item(
rx.text("N'a pas remis ses tâches scolaires dans les délais", size="2"),
value="devoir",
),
rx.radio_group.item(
rx.text("A manifesté un comportement répréhensible", size="2"),
value="comportement",
),
rx.radio_group.item(
rx.text("Est arrivé en retard aux cours", size="2"),
value="retard",
),
spacing="2",
),
value=RetenueState.case,
on_change=RetenueState.set_case,
),
spacing="2", width="100%",
),
# Branche (visible seulement si case devoir)
rx.cond(
RetenueState.case == "devoir",
rx.vstack(
rx.text("Branche", size="2", weight="medium", color="var(--gray-11)"),
_branche_selector(),
spacing="1", width="100%",
),
rx.fragment(),
),
# Remarque
rx.vstack(
rx.text("Remarque éventuelle de l'école", size="2", weight="medium", color="var(--gray-11)"),
rx.text_area(
value=RetenueState.remarque,
on_change=RetenueState.set_remarque,
rows="4",
width="100%",
resize="vertical",
),
spacing="1", width="100%",
),
# Erreur
rx.cond(
RetenueState.form_error != "",
rx.callout.root(
rx.callout.icon(rx.icon("triangle-alert", size=16)),
rx.callout.text(RetenueState.form_error),
color_scheme="red", variant="soft", size="1",
),
rx.fragment(),
),
# Option : créer une notice Escada
rx.flex(
rx.checkbox(
checked=RetenueState.add_notice,
on_change=RetenueState.set_add_notice,
size="2",
),
rx.text(
"Ajouter automatiquement une notice sur Escada",
size="2", color="var(--gray-12)",
),
gap="0.5rem", align="center",
padding="0.5rem 0.65rem",
background_color="#f8f9fa",
border="1px solid #e5e7eb",
border_radius="6px",
),
# Actions : télécharger
rx.button(
rx.icon("file-down", size=16),
"Télécharger l'avis",
on_click=RetenueState.download_pdf,
color_scheme="red", size="2",
disabled=RetenueState.selected_id == 0,
),
spacing="4",
width="100%",
)
def _email_section() -> rx.Component:
return rx.box(
rx.vstack(
rx.flex(
rx.icon("mail", size=16, color="#37474f"),
rx.text("Envoyer par email", size="3", weight="bold", color="#37474f"),
gap="0.5rem", align="center",
),
rx.divider(),
rx.text("Destinataire", size="2", weight="medium", color="var(--gray-11)"),
rx.radio_group.root(
rx.vstack(
rx.radio_group.item(
rx.cond(
RetenueState.sel_fiche_email_appr != "",
rx.text("Apprenti — ", RetenueState.sel_fiche_email_appr, size="2"),
rx.text("Apprenti (email inconnu)", size="2", color="var(--gray-9)"),
),
value="apprenti",
disabled=RetenueState.sel_fiche_email_appr == "",
),
rx.radio_group.item(
rx.cond(
RetenueState.sel_fiche_email_form != "",
rx.text("Formateur — ", RetenueState.sel_fiche_email_form, size="2"),
rx.text("Formateur (email inconnu)", size="2", color="var(--gray-9)"),
),
value="formateur",
disabled=RetenueState.sel_fiche_email_form == "",
),
rx.radio_group.item(
rx.text("Autre adresse", size="2"),
value="autre",
),
spacing="2",
),
value=RetenueState.email_dest,
on_change=RetenueState.set_email_dest,
),
rx.cond(
RetenueState.email_dest == "autre",
rx.input(
placeholder="email@domaine.ch",
value=RetenueState.email_custom,
on_change=RetenueState.set_email_custom,
type="email",
width="100%",
),
rx.fragment(),
),
rx.button(
rx.icon("send", size=16),
"Envoyer l'avis par email",
on_click=RetenueState.send_email_action,
color_scheme="blue", size="2",
disabled=RetenueState.selected_id == 0,
),
spacing="3", width="100%",
),
padding="1.25rem",
background_color="white",
border_radius="8px",
border="1px solid #e0e0e0",
width="100%",
)
def retenue_page() -> rx.Component:
return layout(
rx.vstack(
rx.heading("Avis de retenue", size="6"),
rx.cond(
RetenueState.has_apprentis,
rx.vstack(
rx.box(
_form(),
padding="1.25rem",
background_color="white",
border_radius="8px",
border="1px solid #e0e0e0",
width="100%",
),
_email_section(),
spacing="4", width="100%",
),
empty_state(
icon="users",
title="Aucun apprenti",
description="Importe les classes depuis Escadaweb pour générer des avis.",
action_label="Lancer un import",
action_href="/escada",
),
),
spacing="4",
width="100%",
max_width="780px",
)
)

View file

@ -24,6 +24,7 @@ _PAGES = [
("Tableau de bord", "/accueil", "layout-dashboard"),
("Apprentis", "/fiche", "user"),
("Classes", "/classe", "users"),
("Avis de retenue", "/retenue", "file-warning"),
]
_ADMIN_PAGES = [

View file

@ -1,3 +1,4 @@
reflex==0.9.2
markdown==3.10.2
pikepdf==10.5.1

View file

@ -52,6 +52,7 @@ except Exception:
SCRIPT_SYNC = _ROOT / "scripts" / "sync_esacada.py"
SCRIPT_PUSH = _ROOT / "scripts" / "push_to_escada.py"
SCRIPT_PUSH_NOTICES = _ROOT / "scripts" / "push_notices.py"
DATA_DIR = _ROOT / "data"
# Marqueur écrit par run_imports.py à la fin des imports en DB
@ -327,6 +328,8 @@ def run_job(job: CronJob, sess) -> None:
("Push Escada", _build_push_cmd(job)),
("Sync Escada", _build_sync_cmd(job)),
]
elif job.task_kind == "push_notices":
steps = [("Push notices", [sys.executable, str(SCRIPT_PUSH_NOTICES)])]
else:
fp.write(f"[error] task_kind inconnu : {job.task_kind}\n")
overall_rc = 99

251
scripts/push_notices.py Executable file
View file

@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""Push des notices en attente vers Escadaweb.
Workflow par notice :
Classes Élèves (de la classe) Notices (de l'apprenti) → Ajouter
Date / Titre / Remarques Mettre à jour retour Élèves
Réutilise les helpers de `sync_esacada.py` :
- `_launch_context()` : navigateur headless avec profil persistant
- `_ensure_logged_in(page)` : login SSO + 2FA + langue FR
- `_go_to_students_page(page, class_name)` : ouvre ViewLernende d'une classe
Sortie standard (parsée par `cron_tick.py` et la page /escada) :
PUSH_NOTICES_DONE {"ok": N, "err": [...], "remaining": N}
Behaviour DB :
- status='pending' tentative
- succès suppression de la Notice de la DB
- échec status='failed' + error_msg
"""
from __future__ import annotations
import json
import sys
import traceback
from pathlib import Path
_ROOT = Path(__file__).resolve().parent.parent
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from sqlalchemy import select # noqa: E402
from playwright.sync_api import Page # noqa: E402
from src.db import get_session, Notice # noqa: E402
from src.logger import app_log # noqa: E402
from scripts.sync_esacada import ( # noqa: E402
_launch_context, _ensure_logged_in, _go_to_students_page, _log,
CLASSES_URL,
)
def _fill_date(page: Page, date_str: str) -> None:
"""Remplit le champ Date du formulaire notice (DevExpress).
On vise l'input texte directement (`id$="_DXEditor1_I"`), plus stable que
le calendrier popup.
"""
date_input = page.locator("input[id$='_DXEditor1_I']").first
date_input.wait_for(state="visible", timeout=10_000)
date_input.click()
# Sélectionne tout l'ancien contenu (date pré-remplie d'aujourd'hui) puis tape
date_input.press("Control+A")
date_input.type(date_str)
date_input.press("Tab") # commit la valeur
def _push_one_notice(page: Page, notice: Notice, students_url: str) -> tuple[bool, str]:
"""Pousse une notice. Renvoie (ok, error_message).
Pré : `page` est sur la liste Élèves de la classe de l'apprenti.
Post (succès ou échec) : `page` est de retour sur la liste Élèves.
"""
ap = notice.apprenti
nom = ap.nom
prenom = ap.prenom
# 1. Trouver la ligne de l'apprenti et cliquer "Notices"
try:
# On filtre par nom ET prénom pour éviter les homonymes
student_row = page.locator("tr").filter(has_text=nom).filter(has_text=prenom).first
if not student_row.count():
return False, f"Apprenti '{nom} {prenom}' introuvable dans la grille"
student_row.get_by_role("link", name="Notices").first.click()
page.wait_for_load_state("networkidle", timeout=15_000)
except Exception as e:
return False, f"Navigation Notices : {e}"
# 2. Cliquer "Ajouter"
try:
page.locator("a").filter(has_text="Ajouter").first.click()
page.wait_for_timeout(800)
except Exception as e:
return False, f"Bouton Ajouter introuvable : {e}"
# 3. Remplir Date / Titre / Remarques
try:
_fill_date(page, notice.date_event.strftime("%d.%m.%Y"))
except Exception as e:
return False, f"Remplissage date : {e}"
try:
page.get_by_role("textbox", name="Titre:").fill(notice.titre)
except Exception as e:
return False, f"Remplissage titre : {e}"
if notice.remarque:
try:
page.get_by_role("textbox", name="Remarques:").fill(notice.remarque)
except Exception:
pass # Non bloquant
# 4. Sauver
try:
page.get_by_role("link", name="Mettre à jour").click()
page.wait_for_load_state("networkidle", timeout=15_000)
page.wait_for_timeout(500) # laisse le temps à la grille de se rafraîchir
except Exception as e:
return False, f"Échec Mettre à jour : {e}"
# 5. Vérifier que la notice est bien dans la grille de l'apprenti
try:
# On cherche le titre dans la grille des notices (max 30 chars pour éviter
# les soucis de longueur / wrapping).
needle = (notice.titre or "").strip()[:30]
if needle:
cell = page.locator("td").filter(has_text=needle).first
cell.wait_for(state="visible", timeout=8_000)
except Exception:
# Vérification échouée — on retourne quand même à la liste Élèves
# avant de signaler l'échec.
try:
page.goto(students_url)
page.wait_for_load_state("networkidle", timeout=15_000)
except Exception:
pass
return False, "Notice non retrouvée dans la grille après save (échec probable)"
# 6. Retour à la liste Élèves de la même classe (option a : navigation directe)
try:
page.goto(students_url)
page.wait_for_load_state("networkidle", timeout=15_000)
except Exception as e:
return False, f"Retour grille élèves : {e}"
return True, ""
def main():
sess = get_session()
ok_count = 0
errors: list[str] = []
try:
notices = sess.execute(
select(Notice).where(Notice.status == "pending")
).scalars().all()
app_log(f"[push_notices] {len(notices)} notice(s) en attente")
if not notices:
print(
'PUSH_NOTICES_DONE '
+ json.dumps({"ok": 0, "err": [], "remaining": 0}),
flush=True,
)
return
# Groupe par classe pour minimiser les navigations
by_class: dict[str, list[Notice]] = {}
for n in notices:
by_class.setdefault(n.apprenti.classe, []).append(n)
pw, ctx, page = _launch_context()
try:
# Navigation initiale vers ViewKlassen : redirige vers le login
# si la session est expirée, et permet à _ensure_logged_in
# de détecter le succès (ViewKlassen dans l'URL).
page.goto(CLASSES_URL)
_ensure_logged_in(page)
for classe, class_notices in by_class.items():
_log(f"[push_notices] classe={classe} ({len(class_notices)} notices)")
try:
students_page = _go_to_students_page(page, classe)
except Exception as e:
students_page = None
_log(f"[push_notices] erreur navigation {classe}: {e}")
if not students_page:
msg = f"classe '{classe}' introuvable sur Escada"
for n in class_notices:
n.status = "failed"
n.error_msg = msg
errors.append(
f"id={n.id} ({n.apprenti.nom} {n.apprenti.prenom}): {msg}"
)
sess.commit()
continue
students_url = students_page.url
for notice in class_notices:
label = f"{notice.apprenti.nom} {notice.apprenti.prenom}"
try:
ok, err = _push_one_notice(students_page, notice, students_url)
if ok:
sess.delete(notice)
sess.commit()
ok_count += 1
_log(f"[push_notices] OK id={notice.id} ({label})")
else:
notice.status = "failed"
notice.error_msg = err[:500]
sess.commit()
errors.append(f"id={notice.id} ({label}): {err}")
_log(f"[push_notices] FAIL id={notice.id}: {err}")
# Si on est paumé, tenter un retour propre
try:
students_page.goto(students_url)
students_page.wait_for_load_state(
"networkidle", timeout=10_000
)
except Exception:
break # impossible de recover, on passe à la classe suivante
except Exception as e:
notice.status = "failed"
notice.error_msg = str(e)[:500]
sess.commit()
errors.append(f"id={notice.id} ({label}): {e}")
_log(f"[push_notices] EX id={notice.id}: {e}\n{traceback.format_exc()}")
finally:
try: ctx.close()
except Exception: pass
try: pw.stop()
except Exception: pass
finally:
# Compte les notices encore pending (n'incluant pas les "failed")
try:
remaining = sess.execute(
select(Notice).where(Notice.status == "pending")
).all()
remaining_count = len(remaining)
except Exception:
remaining_count = 0
sess.close()
print(
'PUSH_NOTICES_DONE '
+ json.dumps({
"ok": ok_count,
"err": errors,
"remaining": remaining_count,
}, ensure_ascii=False),
flush=True,
)
if __name__ == "__main__":
main()

View file

@ -189,6 +189,9 @@ class ApprentiFiche(Base):
formateur_nom: Mapped[Optional[str]] = mapped_column(String, nullable=True)
formateur_email: Mapped[Optional[str]] = mapped_column(String, nullable=True)
# Profession dérivée du préfixe de classe (mapping dans data/settings.json)
profession: Mapped[Optional[str]] = mapped_column(String, nullable=True)
updated_at: Mapped[datetime] = mapped_column(default=datetime.now, onupdate=datetime.now)
apprenti: Mapped["Apprenti"] = relationship(back_populates="fiche")
@ -206,6 +209,30 @@ class NotesExamen(Base):
apprenti: Mapped["Apprenti"] = relationship(back_populates="notes_examen")
class Notice(Base):
"""Note à pousser sur Escada (liée à un apprenti).
Créée notamment lors de la génération d'un avis de retenue (si la case
correspondante est cochée). Supprimée après push réussi.
"""
__tablename__ = "notices"
id: Mapped[int] = mapped_column(primary_key=True)
apprenti_id: Mapped[int] = mapped_column(ForeignKey("apprentis.id"))
date_event: Mapped[date]
titre: Mapped[str] = mapped_column(Text)
remarque: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
type_notice: Mapped[Optional[str]] = mapped_column(String, nullable=True)
matiere: Mapped[Optional[str]] = mapped_column(String, nullable=True)
source: Mapped[str] = mapped_column(default="manual") # "retenue" pour le moment
status: Mapped[str] = mapped_column(default="pending") # "pending" | "failed"
created_at: Mapped[datetime] = mapped_column(default=datetime.now)
created_by: Mapped[Optional[str]] = mapped_column(String, nullable=True)
error_msg: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
apprenti: Mapped["Apprenti"] = relationship()
class SanctionExport(Base):
__tablename__ = "sanctions_export"
@ -286,6 +313,21 @@ def init_db(engine=None):
for stmt in (
"ALTER TABLE sanctions_export ADD COLUMN nb_absences INTEGER",
"ALTER TABLE cron_jobs ADD COLUMN notify_level TEXT DEFAULT 'normal'",
"ALTER TABLE apprenti_fiches ADD COLUMN profession TEXT",
"""CREATE TABLE IF NOT EXISTS notices (
id INTEGER PRIMARY KEY,
apprenti_id INTEGER NOT NULL REFERENCES apprentis(id),
date_event DATE NOT NULL,
titre TEXT NOT NULL,
remarque TEXT,
type_notice TEXT,
matiere TEXT,
source TEXT NOT NULL DEFAULT 'manual',
status TEXT NOT NULL DEFAULT 'pending',
created_at DATETIME NOT NULL,
created_by TEXT,
error_msg TEXT
)""",
"""CREATE TABLE IF NOT EXISTS escada_pending (
id INTEGER PRIMARY KEY,
apprenti_id INTEGER NOT NULL REFERENCES apprentis(id),
@ -315,6 +357,7 @@ def upsert_apprenti_fiche(session: Session, apprenti_id: int, data: dict) -> Non
"entreprise_nom", "entreprise_adresse", "entreprise_code_postal",
"entreprise_localite", "entreprise_telephone", "entreprise_email",
"formateur_nom", "formateur_email",
"profession",
]
if existing:
for f in fields:

113
src/profession.py Normal file
View file

@ -0,0 +1,113 @@
"""Helper pour la résolution `classe → profession` via mapping configurable.
Mapping stocké dans `data/settings.json` sous la clé `class_profession_mapping`,
forme : `[{"prefix": "AUTOMAT", "profession": "Automaticien CFC"}, ...]`.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import Session
from src.db import Apprenti, ApprentiFiche, upsert_apprenti_fiche
_ROOT = Path(__file__).resolve().parent.parent
_DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data")))
_SETTINGS_PATH = _DATA_DIR / "settings.json"
_DEFAULT_MAPPING = [
{"prefix": "AUTOMAT", "profession": "Automaticien CFC"},
{"prefix": "MONTAUT", "profession": "Monteur Automaticien CFC"},
{"prefix": "EM-AU", "profession": "Automaticien CFC"},
]
def _load_settings() -> dict:
if _SETTINGS_PATH.exists():
try:
return json.loads(_SETTINGS_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
return {}
def _save_settings(s: dict) -> None:
_SETTINGS_PATH.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding="utf-8")
def load_mapping() -> list[dict]:
"""Renvoie la liste des correspondances [{prefix, profession}, ...]."""
s = _load_settings()
return list(s.get("class_profession_mapping", _DEFAULT_MAPPING))
def save_mapping(mapping: list[dict]) -> None:
"""Sauve le mapping (filtre les entrées vides)."""
cleaned = [
{"prefix": (m.get("prefix") or "").strip(), "profession": (m.get("profession") or "").strip()}
for m in mapping
]
cleaned = [m for m in cleaned if m["prefix"] and m["profession"]]
s = _load_settings()
s["class_profession_mapping"] = cleaned
_save_settings(s)
def resolve_profession(classe: str, mapping: Optional[list[dict]] = None) -> str:
"""Renvoie la profession matchant le préfixe de la classe, ou '' si aucun."""
if not classe:
return ""
if mapping is None:
mapping = load_mapping()
# On préfère le préfixe le plus long en cas de chevauchement
for entry in sorted(mapping, key=lambda m: -len(m.get("prefix", ""))):
prefix = entry.get("prefix", "")
if prefix and classe.startswith(prefix):
return entry.get("profession", "")
return ""
def find_unmapped_classes(session: Session) -> list[str]:
"""Liste les classes en DB sans correspondance dans le mapping.
Exclut MP/MI (déjà filtrées partout dans l'app).
"""
mapping = load_mapping()
classes = session.execute(
select(Apprenti.classe).distinct().order_by(Apprenti.classe)
).scalars().all()
out = []
for c in classes:
if not c or c.startswith(("MP", "MI")):
continue
if not resolve_profession(c, mapping):
out.append(c)
return out
def refresh_all_professions(session: Session) -> int:
"""Recalcule `profession` pour tous les apprentis en base.
Renvoie le nombre de fiches mises à jour. Utile :
- une fois à l'init après ajout du champ
- après modification du mapping dans Paramètres
- après une sync Escada
"""
mapping = load_mapping()
apprentis = session.execute(select(Apprenti)).scalars().all()
n = 0
for ap in apprentis:
prof = resolve_profession(ap.classe, mapping)
if not prof:
# Pas de mapping → on laisse la valeur existante si présente
continue
# upsert : crée la fiche si elle n'existe pas, sinon met à jour profession
upsert_apprenti_fiche(session, ap.id, {"profession": prof})
n += 1
session.commit()
return n

231
src/retenue_pdf.py Normal file
View file

@ -0,0 +1,231 @@
"""Génération d'avis de retenue à partir du template AcroForm.
Template : `data/templates/GF_FO_Avis_de_retenue.pdf`. Le champ `Date` du
template a 3 widgets-enfants partagés (un par ligne du formulaire). On les
sépare en 3 champs distincts (`Date_devoir`, `Date_comportement`, `Date_retard`)
puis on remplit uniquement celui correspondant à la case cochée.
Le PDF généré reste éditable (formulaire préservé).
"""
from __future__ import annotations
import io
import os
from datetime import date as _date
from pathlib import Path
from typing import Optional
import pypdf
from sqlalchemy.orm import Session
from src.db import Apprenti, ApprentiFiche
_ROOT = Path(__file__).resolve().parent.parent
_DATA_DIR = Path(os.getenv("DATA_DIR", str(_ROOT / "data")))
_TEMPLATE_PATH = _DATA_DIR / "templates" / "GF_FO_Avis_de_retenue.pdf"
_MOIS_FR = [
"janvier", "février", "mars", "avril", "mai", "juin",
"juillet", "août", "septembre", "octobre", "novembre", "décembre",
]
# Mapping case → suffixe + index (ordre des widgets enfants triés par Y desc)
_CASE_TO_SUFFIX = {
"devoir": ("Date_devoir", 0),
"comportement": ("Date_comportement", 1),
"retard": ("Date_retard", 2),
}
def format_date_long(d: _date) -> str:
"""Formate une date en 'jour mois année' (ex: '12 mars 2026')."""
return f"{d.day} {_MOIS_FR[d.month - 1]} {d.year}"
def generate_retenue_pdf(
sess: Session,
apprenti_id: int,
*,
profession: str,
retenue_date: _date,
probleme_date: _date,
case: str, # "devoir" | "comportement" | "retard"
branche: str = "",
remarque: str = "",
prof_name: str = "",
) -> Optional[bytes]:
"""Pré-remplit le template puis aplatit le PDF. Renvoie les bytes du PDF aplati."""
if not _TEMPLATE_PATH.exists():
return None
apprenti = sess.get(Apprenti, apprenti_id)
if apprenti is None:
return None
fiche: Optional[ApprentiFiche] = apprenti.fiche
classe_full = (
f"{profession.strip()} {apprenti.classe}".strip()
if profession else apprenti.classe
)
npa_ville = ""
if fiche:
cp = (fiche.entreprise_code_postal or "").strip()
loc = (fiche.entreprise_localite or "").strip()
npa_ville = f"{cp} {loc}".strip()
# 1. Lecture template + clone
reader = pypdf.PdfReader(str(_TEMPLATE_PATH))
writer = pypdf.PdfWriter(clone_from=reader)
# 2. Séparer les 3 widgets du champ Date en 3 champs distincts.
# Après cette opération, on peut remplir chaque Date_xxx individuellement.
_split_date_field(writer)
# 3. Remplit les champs texte (Date_xxx inclus pour la case sélectionnée)
target_date_field = _CASE_TO_SUFFIX.get(case, (None, None))[0]
field_values: dict[str, str] = {
"NomApprenti": f"{apprenti.prenom} {apprenti.nom}".strip(),
"Classe": classe_full,
"NomEntreprise": (fiche.entreprise_nom if fiche else "") or "",
"Adresse": (fiche.entreprise_adresse if fiche else "") or "",
"NPA-Ville": npa_ville,
"RetenueDateHeure": retenue_date.strftime("%d.%m.%Y"),
"Branche": branche if case == "devoir" else "",
"Remarque": remarque,
"DateAvis": format_date_long(_date.today()),
"Profs": prof_name or "",
}
if target_date_field:
field_values[target_date_field] = probleme_date.strftime("%d.%m.%Y")
for page in writer.pages:
try:
writer.update_page_form_field_values(
page, field_values, auto_regenerate=False,
)
except Exception:
pass
# 4. Checkboxes
case_to_field = {
"devoir": "CaseDevoir",
"comportement": "CaseComportement",
"retard": "CaseRetard",
}
target_check = case_to_field.get(case)
for fname in case_to_field.values():
try:
_set_checkbox(writer, fname, fname == target_check)
except Exception:
pass
# 5. Force NeedAppearances pour que les viewers redessinent les valeurs
try:
root = writer._root_object
if "/AcroForm" in root:
root["/AcroForm"].update({
pypdf.generic.NameObject("/NeedAppearances"):
pypdf.generic.BooleanObject(True)
})
except Exception:
pass
# 6. Écriture (formulaire préservé éditable)
buf = io.BytesIO()
writer.write(buf)
return buf.getvalue()
def _split_date_field(writer: pypdf.PdfWriter) -> None:
"""Sépare le champ `Date` (avec 3 widgets enfants) en 3 champs indépendants.
Renomme les widgets selon leur position Y (ordre du haut vers le bas) :
kid #0 (haut) → Date_devoir
kid #1 (milieu) → Date_comportement
kid #2 (bas) → Date_retard
"""
NameObject = pypdf.generic.NameObject
acroform_ref = writer._root_object.get("/AcroForm")
if not acroform_ref:
return
acroform = acroform_ref.get_object() if hasattr(acroform_ref, "get_object") else acroform_ref
fields = acroform.get("/Fields") or []
date_field = None
date_ref = None
for f in fields:
if f.get_object().get("/T") == "Date":
date_field = f.get_object()
date_ref = f
break
if date_field is None:
return
kids = date_field.get("/Kids") or []
if not kids:
return
# Trier les enfants par Y descendant
indexed = []
for kid in kids:
ko = kid.get_object()
rect = ko.get("/Rect")
y = float(rect[1]) if rect else 0.0
indexed.append((y, kid, ko))
indexed.sort(key=lambda t: -t[0])
# Promouvoir chaque enfant en champ indépendant
new_fields = []
suffixes_by_order = ["Date_devoir", "Date_comportement", "Date_retard"]
for i, (_y, kid_ref, kid_obj) in enumerate(indexed):
# Renomme : donne un /T propre à l'ancien widget enfant
kid_obj[NameObject("/T")] = pypdf.generic.create_string_object(
suffixes_by_order[i]
)
# Hériter du /FT, /DA, /Q du parent si manquant sur l'enfant
for prop in ("/FT", "/DA", "/Q"):
if prop not in kid_obj and prop in date_field:
kid_obj[NameObject(prop)] = date_field[prop]
# Détacher du parent
if "/Parent" in kid_obj:
del kid_obj[NameObject("/Parent")]
new_fields.append(kid_ref)
# Retirer l'ancien champ Date de /Fields, ajouter les 3 nouveaux
new_field_list = [f for f in fields if f is not date_ref] + new_fields
acroform[NameObject("/Fields")] = pypdf.generic.ArrayObject(new_field_list)
def _find_field(writer: pypdf.PdfWriter, name: str):
acroform = writer._root_object.get("/AcroForm")
if not acroform:
return None
for f in acroform.get("/Fields") or []:
obj = f.get_object()
if obj.get("/T") == name:
return obj
return None
def _set_checkbox(writer: pypdf.PdfWriter, field_name: str, checked: bool) -> None:
"""Coche/décoche une checkbox AcroForm, gère les widgets enfants sans /T."""
NameObject = pypdf.generic.NameObject
field = _find_field(writer, field_name)
if field is None:
return
kids = field.get("/Kids")
widgets = [k.get_object() for k in kids] if kids else [field]
on_value = "/Yes"
for widget in widgets:
ap = widget.get("/AP") or field.get("/AP")
if ap is not None:
n_ap = ap.get("/N") if hasattr(ap, "get") else None
if n_ap is not None:
for k in n_ap.keys():
ks = str(k)
if ks not in ("/Off", "Off"):
on_value = ks if ks.startswith("/") else f"/{ks}"
break
new_val = NameObject(on_value if checked else "/Off")
widget[NameObject("/AS")] = new_val
field[NameObject("/V")] = NameObject(on_value if checked else "/Off")