From 7f220a9b65e164d1cd3d0c59aeee8a87d89be968 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 7 May 2026 00:13:39 +0000 Subject: [PATCH] feat(orchestrator): Faktencheck vor Lagebild mit Fallback (sequenziell) Bislang liefen factcheck + analyze parallel via asyncio.gather. Folge: Lagebild konnte Aussagen treffen, die der Faktencheck im selben Refresh als contradicted markiert. Inkonsistenz zwischen Lagebild-Tab und Faktencheck- Tab; im PDF/DOCX-Export schon kritisch. Variante 1 aus der Diskussion: strikt sequenziell, mit Fallback bei Faktencheck-Fail (Refresh bricht NICHT ab, Lagebild laeuft dann ohne Faktenkontext wie bisher, ein Logeintrag dokumentiert den Fallback). Aenderungen: - analyzer.build_fact_context_block(): neuer Helper, baut den GEPRUEFTE-FAKTEN-Block aus existing_facts + neuen/aktualisierten Fakten. Status-Domaenen adhoc/research vereinheitlicht zu Bestaetigt / Umstritten / Unbestaetigt / Entwicklung. Max 20 Fakten, sortiert nach Status-Prioritaet desc und sources_count desc. Bei leerer Eingabe leerer String -> Fallback-Pfad. - analyzer.analyze() / analyze_incremental(): neuer Optional-Parameter fact_context_block (default leer, Backward-Compat). 4 Prompt-Templates bekommen {fact_context_block}-Platzhalter sowie eine AUSSAGE-DISZIPLIN- Sektion: bestaetigte Fakten als Geruest, Umstrittenes explizit machen, Unbestaetigtes klar einordnen, kein Spekulieren ueber ungedecktes. - orchestrator: asyncio.gather durch sequenzielle Logik ersetzt. Faktencheck zuerst, Pipeline-Step 6 done direkt nach dem Aufruf (count_value ist Schaetzung; finale DB-Zahlen stehen spaeter). Lagebild danach (Step 7) mit fact_context_block. _do_analysis-Closure um den Parameter erweitert, kein toter Inline-Block. - spaeteres _pipe_done(factcheck) entfernt -- der Step wird jetzt frueher geschlossen, der spaetere Persistierungsblock laesst ihn unberuehrt. UI-Pipeline zeigt automatisch sequenzielle Aktivitaet statt beide Steps gleichzeitig -- keine Frontend-Aenderung noetig. Latenz pro Refresh steigt um die factcheck-Dauer. Bewusst akzeptiert: Konsistenz vor Geschwindigkeit. --- src/agents/analyzer.py | 119 +++++++++++++++++++++++++++++++++++-- src/agents/orchestrator.py | 76 ++++++++++++++++++----- 2 files changed, 175 insertions(+), 20 deletions(-) diff --git a/src/agents/analyzer.py b/src/agents/analyzer.py index 8a067af..9bb45e6 100644 --- a/src/agents/analyzer.py +++ b/src/agents/analyzer.py @@ -16,7 +16,7 @@ WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschre VORFALL: {title} KONTEXT: {description} -VORHANDENE MELDUNGEN: +{fact_context_block}VORHANDENE MELDUNGEN: {articles_text} AUFTRAG: @@ -59,7 +59,7 @@ WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschre THEMA: {title} KONTEXT: {description} -VORLIEGENDE QUELLEN: +{fact_context_block}VORLIEGENDE QUELLEN: {articles_text} AUFTRAG: @@ -118,7 +118,7 @@ BISHERIGES LAGEBILD: BISHERIGE QUELLEN: {previous_sources_text} -NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE: +{fact_context_block}NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE: {new_articles_text} AUFTRAG: @@ -165,7 +165,7 @@ BISHERIGES BRIEFING: BISHERIGE QUELLEN: {previous_sources_text} -NEUE QUELLEN SEIT DEM LETZTEN UPDATE: +{fact_context_block}NEUE QUELLEN SEIT DEM LETZTEN UPDATE: {new_articles_text} AUFTRAG: @@ -264,6 +264,112 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt — KEINE Erklärung, KEINE Einleitung: {{"relevant_ids": [1, 3, 7]}}""" + + +# Status-Gruppen fuer den Fakten-Kontext im Analyse-Prompt. +# adhoc nutzt confirmed/unconfirmed/contradicted/developing, +# research nutzt established/unverified/disputed/developing — beide Domaenen +# werden in dieselben vier Anzeige-Gruppen abgebildet. +_FACT_STATUS_GROUPS = [ + ("Bestätigt (mehrere unabhängige Quellen oder durch Faktencheck als gesichert eingestuft):", + {"confirmed", "established"}), + ("Umstritten (Quellen widersprechen sich oder Faktencheck hat Widersprüche dokumentiert):", + {"contradicted", "disputed"}), + ("Unbestätigt (nur eine einzelne Quelle, eine unabhängige Bestätigung steht aus):", + {"unconfirmed", "unverified"}), + ("In Entwicklung (laufender Sachverhalt, Stand offen):", + {"developing"}), +] + +_FACT_STATUS_PRIORITY = { + "confirmed": 5, "established": 5, + "contradicted": 4, "disputed": 4, + "unconfirmed": 3, "unverified": 3, + "developing": 1, +} + + +def build_fact_context_block( + existing_facts: list[dict] | None, + new_or_updated_facts: list[dict] | None, + incident_type: str, + max_total: int = 20, +) -> str: + """Baut den 'GEPRUEFTE FAKTEN'-Block fuer den Analyse-Prompt. + + Wird vom Orchestrator zwischen Faktencheck und Lagebild aufgerufen, damit + das Lagebild auf gepruefter Faktenbasis schreibt und Unklarheiten explizit + benennt. Bei leerer Faktenliste wird ein leerer String zurueckgegeben — der + Prompt laeuft dann ohne Fakten-Kontext (Fallback bei Faktencheck-Fail oder + bei Lagen ohne bisherige Fakten). + """ + existing_facts = existing_facts or [] + new_or_updated_facts = new_or_updated_facts or [] + if not existing_facts and not new_or_updated_facts: + return "" + + seen_claims: set[str] = set() + merged: list[dict] = [] + # Neue/aktualisierte Fakten zuerst (Status ist aktueller Stand). + for f in new_or_updated_facts: + c = (f.get("claim") or "").strip().lower() + if not c or c in seen_claims: + continue + seen_claims.add(c) + merged.append(f) + # Dann alte unveraenderte Fakten. + for f in existing_facts: + c = (f.get("claim") or "").strip().lower() + if not c or c in seen_claims: + continue + seen_claims.add(c) + merged.append(f) + + if not merged: + return "" + + merged.sort(key=lambda f: ( + -_FACT_STATUS_PRIORITY.get((f.get("status") or "").lower(), 0), + -(f.get("sources_count") or 0), + )) + merged = merged[:max_total] + + grouped: dict[str, list[dict]] = {label: [] for label, _ in _FACT_STATUS_GROUPS} + for f in merged: + s = (f.get("status") or "").lower() + for label, codes in _FACT_STATUS_GROUPS: + if s in codes: + grouped[label].append(f) + break + + if not any(grouped.values()): + return "" + + lines: list[str] = [] + lines.append("GEPRÜFTE FAKTEN (Stand nach dem Faktencheck dieses Refresh, max. {n} priorisiert):".format(n=max_total)) + for label, _codes in _FACT_STATUS_GROUPS: + items = grouped[label] + if not items: + continue + lines.append("") + lines.append(label) + for f in items: + claim = (f.get("claim") or "").strip() + sc = f.get("sources_count") or 0 + sc_text = f" ({sc} {'Quellen' if sc != 1 else 'Quelle'})" if sc else "" + lines.append(f"- {claim}{sc_text}") + + lines.append("") + lines.append("AUSSAGE-DISZIPLIN für das Lagebild:") + lines.append("- Bestätigte Fakten als Grundgerüst nehmen, ohne Hedging.") + lines.append("- Umstrittene Punkte explizit als umstritten kennzeichnen, beide Seiten knapp benennen.") + lines.append("- Unbestätigtes klar einordnen ('Eine einzelne Quelle berichtet ...', 'Eine unabhängige Bestätigung steht aus.').") + lines.append("- Bei Aussagen, die durch keinen geprüften Fakt gedeckt sind und auch nicht direkt aus einer der vorliegenden Meldungen hervorgehen: NICHT spekulieren — entweder weglassen oder als unklar kennzeichnen.") + lines.append("- Triff KEINE Aussagen, die mit den oben gelisteten geprüften Fakten in Widerspruch stehen.") + lines.append("") + return "\n".join(lines) + + class AnalyzerAgent: """Analysiert und übersetzt Meldungen über Claude CLI.""" @@ -290,7 +396,7 @@ class AnalyzerAgent: articles_text += f"Inhalt: {content[:800]}\n" return articles_text - async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[dict | None, ClaudeUsage | None]: + async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc", fact_context_block: str = "") -> tuple[dict | None, ClaudeUsage | None]: """Erstanalyse: Analysiert alle Meldungen zu einem Vorfall (erster Refresh).""" if not articles: return None, None @@ -306,6 +412,7 @@ class AnalyzerAgent: articles_text=articles_text, today=today, output_language=OUTPUT_LANGUAGE, + fact_context_block=fact_context_block, ) try: @@ -327,6 +434,7 @@ class AnalyzerAgent: previous_summary: str, previous_sources_json: str | None, incident_type: str = "adhoc", + fact_context_block: str = "", ) -> tuple[dict | None, ClaudeUsage | None]: """Inkrementelle Analyse: Aktualisiert das Lagebild mit nur den neuen Artikeln. @@ -369,6 +477,7 @@ class AnalyzerAgent: new_articles_text=new_articles_text, today=today, output_language=OUTPUT_LANGUAGE, + fact_context_block=fact_context_block, ) try: diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 225a666..e8bb457 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -1299,18 +1299,22 @@ class AgentOrchestrator: except Exception as e: logger.warning("Bias-Anreicherung fehlgeschlagen (Pipeline laeuft weiter): %s", e) - # --- Analyse-Task --- - async def _do_analysis(): + # --- Analyse-Task (wird nach _do_factcheck mit fact_context_block aufgerufen) --- + async def _do_analysis(fact_context_block: str = ""): analyzer = AnalyzerAgent() if previous_summary and new_count > 0: logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") return await analyzer.analyze_incremental( title, description, new_articles_for_analysis, previous_summary, previous_sources_json, incident_type, + fact_context_block=fact_context_block, ) else: logger.info("Erstanalyse: Alle Artikel werden analysiert") - return await analyzer.analyze(title, description, all_articles_preloaded, incident_type) + return await analyzer.analyze( + title, description, all_articles_preloaded, incident_type, + fact_context_block=fact_context_block, + ) # --- Faktencheck-Task --- async def _do_factcheck(): @@ -1344,20 +1348,61 @@ class AgentOrchestrator: articles_for_check = [dict(row) for row in await cursor.fetchall()] return await factchecker.check(title, articles_for_check, incident_type) - # Pipeline-Schritte 6+7: Lagebild verfassen + Fakten prüfen (Start, parallel) - await _pipe_start("summary") + # Pipeline-Schritt 6: Faktencheck zuerst (sequenziell). Liefert den + # Faktenkontext fuer das Lagebild, damit dieses auf geprueftem Stand + # schreibt und Unklarheiten explizit benennt. Variante 1: bei + # Faktencheck-Fehler faellt das Lagebild auf den alten Pfad ohne + # Faktenkontext zurueck (Refresh bricht NICHT ab). await _pipe_start("factcheck") + factcheck_result: tuple = ([], None) + fact_context_block = "" + factcheck_failed_reason: str | None = None + try: + factcheck_result = await _do_factcheck() + except Exception as fc_err: + factcheck_failed_reason = str(fc_err) + logger.warning( + "Faktencheck fehlgeschlagen, Lagebild laeuft ohne Faktenkontext: %s", + fc_err, exc_info=True, + ) - # Beide Tasks PARALLEL starten - logger.info("Starte Analyse und Faktencheck parallel...") - analysis_result, factcheck_result = await asyncio.gather( - _do_analysis(), - _do_factcheck(), + fact_checks, fc_usage = factcheck_result if factcheck_result else ([], None) + + # Pipeline-Schritt 6 done direkt nach dem Aufruf — die finale + # DB-Persistierung passiert weiter unten, aber fuer die UI ist + # der Faktencheck-Aufruf hier abgeschlossen. Der count_value + # ist eine Schaetzung (echte Zahl steht spaeter in der DB). + _fc_estimated_new = max(0, len(fact_checks or []) - len(existing_facts or [])) + await _pipe_done( + "factcheck", + count_value=_fc_estimated_new, + count_secondary=len(fact_checks) if fact_checks else 0, ) + # Faktenkontext fuer das Lagebild bauen. + try: + from agents.analyzer import build_fact_context_block as _build_fc_ctx + fact_context_block = _build_fc_ctx( + existing_facts or [], fact_checks or [], incident_type, + ) + if fact_context_block: + logger.info( + "Faktenkontext fuer Lagebild: %d Zeichen, basierend auf %d alten + %d neuen Fakten", + len(fact_context_block), len(existing_facts or []), len(fact_checks or []), + ) + except Exception as ctx_err: + logger.warning("build_fact_context_block fehlgeschlagen: %s", ctx_err, exc_info=True) + fact_context_block = "" + + # Pipeline-Schritt 7: Lagebild verfassen (jetzt mit Faktenkontext) + await _pipe_start("summary") + logger.info( + "Starte Lagebild (sequenziell nach Faktencheck%s)", + " — OHNE Faktenkontext (Fallback)" if factcheck_failed_reason else "", + ) + analysis_result = await _do_analysis(fact_context_block) + analysis, analysis_usage = analysis_result - fact_checks, fc_usage = factcheck_result - # Pipeline-Schritt 6: Lagebild verfassen (fertig, keine Zahl, nur Status) await _pipe_done("summary", count_value=None, count_secondary=None) # --- Analyse-Ergebnisse verarbeiten --- @@ -1656,9 +1701,10 @@ class AgentOrchestrator: await db.commit() - # Pipeline-Schritt 7: Fakten prüfen (fertig) - _new_facts_count = max(0, len(fact_checks) - len(existing_facts)) - await _pipe_done("factcheck", count_value=_new_facts_count, count_secondary=len(fact_checks) if fact_checks else 0) + # Pipeline-Schritt 7 (Fakten pruefen) wurde bereits frueher als done + # markiert (siehe weiter oben — direkt nach dem _do_factcheck-Aufruf, + # bevor das Lagebild generiert wurde). Hier nur noch die DB- + # Persistierung der Fakten, ohne den Step erneut zu schliessen. # Pipeline-Schritt 8: Qualitätscheck (Start, ohne Zahlen) await _pipe_start("qc")