feat(orchestrator): Faktencheck vor Lagebild mit Fallback (sequenziell)

Bislang liefen factcheck + analyze parallel via asyncio.gather. Folge:
Lagebild konnte Aussagen treffen, die der Faktencheck im selben Refresh als
contradicted markiert. Inkonsistenz zwischen Lagebild-Tab und Faktencheck-
Tab; im PDF/DOCX-Export schon kritisch.

Variante 1 aus der Diskussion: strikt sequenziell, mit Fallback bei
Faktencheck-Fail (Refresh bricht NICHT ab, Lagebild laeuft dann ohne
Faktenkontext wie bisher, ein Logeintrag dokumentiert den Fallback).

Aenderungen:
- analyzer.build_fact_context_block(): neuer Helper, baut den
  GEPRUEFTE-FAKTEN-Block aus existing_facts + neuen/aktualisierten
  Fakten. Status-Domaenen adhoc/research vereinheitlicht zu Bestaetigt /
  Umstritten / Unbestaetigt / Entwicklung. Max 20 Fakten, sortiert nach
  Status-Prioritaet desc und sources_count desc. Bei leerer Eingabe
  leerer String -> Fallback-Pfad.
- analyzer.analyze() / analyze_incremental(): neuer Optional-Parameter
  fact_context_block (default leer, Backward-Compat). 4 Prompt-Templates
  bekommen {fact_context_block}-Platzhalter sowie eine AUSSAGE-DISZIPLIN-
  Sektion: bestaetigte Fakten als Geruest, Umstrittenes explizit machen,
  Unbestaetigtes klar einordnen, kein Spekulieren ueber ungedecktes.
- orchestrator: asyncio.gather durch sequenzielle Logik ersetzt.
  Faktencheck zuerst, Pipeline-Step 6 done direkt nach dem Aufruf
  (count_value ist Schaetzung; finale DB-Zahlen stehen spaeter). Lagebild
  danach (Step 7) mit fact_context_block. _do_analysis-Closure um den
  Parameter erweitert, kein toter Inline-Block.
- spaeteres _pipe_done(factcheck) entfernt -- der Step wird jetzt frueher
  geschlossen, der spaetere Persistierungsblock laesst ihn unberuehrt.

UI-Pipeline zeigt automatisch sequenzielle Aktivitaet statt beide Steps
gleichzeitig -- keine Frontend-Aenderung noetig.

Latenz pro Refresh steigt um die factcheck-Dauer. Bewusst akzeptiert:
Konsistenz vor Geschwindigkeit.
Dieser Commit ist enthalten in:
Claude Code
2026-05-07 00:13:39 +00:00
Ursprung f4c0c930b8
Commit 7f220a9b65
2 geänderte Dateien mit 175 neuen und 20 gelöschten Zeilen

Datei anzeigen

@@ -16,7 +16,7 @@ WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschre
VORFALL: {title} VORFALL: {title}
KONTEXT: {description} KONTEXT: {description}
VORHANDENE MELDUNGEN: {fact_context_block}VORHANDENE MELDUNGEN:
{articles_text} {articles_text}
AUFTRAG: AUFTRAG:
@@ -59,7 +59,7 @@ WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschre
THEMA: {title} THEMA: {title}
KONTEXT: {description} KONTEXT: {description}
VORLIEGENDE QUELLEN: {fact_context_block}VORLIEGENDE QUELLEN:
{articles_text} {articles_text}
AUFTRAG: AUFTRAG:
@@ -118,7 +118,7 @@ BISHERIGES LAGEBILD:
BISHERIGE QUELLEN: BISHERIGE QUELLEN:
{previous_sources_text} {previous_sources_text}
NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE: {fact_context_block}NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE:
{new_articles_text} {new_articles_text}
AUFTRAG: AUFTRAG:
@@ -165,7 +165,7 @@ BISHERIGES BRIEFING:
BISHERIGE QUELLEN: BISHERIGE QUELLEN:
{previous_sources_text} {previous_sources_text}
NEUE QUELLEN SEIT DEM LETZTEN UPDATE: {fact_context_block}NEUE QUELLEN SEIT DEM LETZTEN UPDATE:
{new_articles_text} {new_articles_text}
AUFTRAG: AUFTRAG:
@@ -264,6 +264,112 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt — KEINE Erklärung, KEINE Einleitung:
{{"relevant_ids": [1, 3, 7]}}""" {{"relevant_ids": [1, 3, 7]}}"""
# Status-Gruppen fuer den Fakten-Kontext im Analyse-Prompt.
# adhoc nutzt confirmed/unconfirmed/contradicted/developing,
# research nutzt established/unverified/disputed/developing — beide Domaenen
# werden in dieselben vier Anzeige-Gruppen abgebildet.
_FACT_STATUS_GROUPS = [
("Bestätigt (mehrere unabhängige Quellen oder durch Faktencheck als gesichert eingestuft):",
{"confirmed", "established"}),
("Umstritten (Quellen widersprechen sich oder Faktencheck hat Widersprüche dokumentiert):",
{"contradicted", "disputed"}),
("Unbestätigt (nur eine einzelne Quelle, eine unabhängige Bestätigung steht aus):",
{"unconfirmed", "unverified"}),
("In Entwicklung (laufender Sachverhalt, Stand offen):",
{"developing"}),
]
_FACT_STATUS_PRIORITY = {
"confirmed": 5, "established": 5,
"contradicted": 4, "disputed": 4,
"unconfirmed": 3, "unverified": 3,
"developing": 1,
}
def build_fact_context_block(
existing_facts: list[dict] | None,
new_or_updated_facts: list[dict] | None,
incident_type: str,
max_total: int = 20,
) -> str:
"""Baut den 'GEPRUEFTE FAKTEN'-Block fuer den Analyse-Prompt.
Wird vom Orchestrator zwischen Faktencheck und Lagebild aufgerufen, damit
das Lagebild auf gepruefter Faktenbasis schreibt und Unklarheiten explizit
benennt. Bei leerer Faktenliste wird ein leerer String zurueckgegeben — der
Prompt laeuft dann ohne Fakten-Kontext (Fallback bei Faktencheck-Fail oder
bei Lagen ohne bisherige Fakten).
"""
existing_facts = existing_facts or []
new_or_updated_facts = new_or_updated_facts or []
if not existing_facts and not new_or_updated_facts:
return ""
seen_claims: set[str] = set()
merged: list[dict] = []
# Neue/aktualisierte Fakten zuerst (Status ist aktueller Stand).
for f in new_or_updated_facts:
c = (f.get("claim") or "").strip().lower()
if not c or c in seen_claims:
continue
seen_claims.add(c)
merged.append(f)
# Dann alte unveraenderte Fakten.
for f in existing_facts:
c = (f.get("claim") or "").strip().lower()
if not c or c in seen_claims:
continue
seen_claims.add(c)
merged.append(f)
if not merged:
return ""
merged.sort(key=lambda f: (
-_FACT_STATUS_PRIORITY.get((f.get("status") or "").lower(), 0),
-(f.get("sources_count") or 0),
))
merged = merged[:max_total]
grouped: dict[str, list[dict]] = {label: [] for label, _ in _FACT_STATUS_GROUPS}
for f in merged:
s = (f.get("status") or "").lower()
for label, codes in _FACT_STATUS_GROUPS:
if s in codes:
grouped[label].append(f)
break
if not any(grouped.values()):
return ""
lines: list[str] = []
lines.append("GEPRÜFTE FAKTEN (Stand nach dem Faktencheck dieses Refresh, max. {n} priorisiert):".format(n=max_total))
for label, _codes in _FACT_STATUS_GROUPS:
items = grouped[label]
if not items:
continue
lines.append("")
lines.append(label)
for f in items:
claim = (f.get("claim") or "").strip()
sc = f.get("sources_count") or 0
sc_text = f" ({sc} {'Quellen' if sc != 1 else 'Quelle'})" if sc else ""
lines.append(f"- {claim}{sc_text}")
lines.append("")
lines.append("AUSSAGE-DISZIPLIN für das Lagebild:")
lines.append("- Bestätigte Fakten als Grundgerüst nehmen, ohne Hedging.")
lines.append("- Umstrittene Punkte explizit als umstritten kennzeichnen, beide Seiten knapp benennen.")
lines.append("- Unbestätigtes klar einordnen ('Eine einzelne Quelle berichtet ...', 'Eine unabhängige Bestätigung steht aus.').")
lines.append("- Bei Aussagen, die durch keinen geprüften Fakt gedeckt sind und auch nicht direkt aus einer der vorliegenden Meldungen hervorgehen: NICHT spekulieren — entweder weglassen oder als unklar kennzeichnen.")
lines.append("- Triff KEINE Aussagen, die mit den oben gelisteten geprüften Fakten in Widerspruch stehen.")
lines.append("")
return "\n".join(lines)
class AnalyzerAgent: class AnalyzerAgent:
"""Analysiert und übersetzt Meldungen über Claude CLI.""" """Analysiert und übersetzt Meldungen über Claude CLI."""
@@ -290,7 +396,7 @@ class AnalyzerAgent:
articles_text += f"Inhalt: {content[:800]}\n" articles_text += f"Inhalt: {content[:800]}\n"
return articles_text return articles_text
async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[dict | None, ClaudeUsage | None]: async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc", fact_context_block: str = "") -> tuple[dict | None, ClaudeUsage | None]:
"""Erstanalyse: Analysiert alle Meldungen zu einem Vorfall (erster Refresh).""" """Erstanalyse: Analysiert alle Meldungen zu einem Vorfall (erster Refresh)."""
if not articles: if not articles:
return None, None return None, None
@@ -306,6 +412,7 @@ class AnalyzerAgent:
articles_text=articles_text, articles_text=articles_text,
today=today, today=today,
output_language=OUTPUT_LANGUAGE, output_language=OUTPUT_LANGUAGE,
fact_context_block=fact_context_block,
) )
try: try:
@@ -327,6 +434,7 @@ class AnalyzerAgent:
previous_summary: str, previous_summary: str,
previous_sources_json: str | None, previous_sources_json: str | None,
incident_type: str = "adhoc", incident_type: str = "adhoc",
fact_context_block: str = "",
) -> tuple[dict | None, ClaudeUsage | None]: ) -> tuple[dict | None, ClaudeUsage | None]:
"""Inkrementelle Analyse: Aktualisiert das Lagebild mit nur den neuen Artikeln. """Inkrementelle Analyse: Aktualisiert das Lagebild mit nur den neuen Artikeln.
@@ -369,6 +477,7 @@ class AnalyzerAgent:
new_articles_text=new_articles_text, new_articles_text=new_articles_text,
today=today, today=today,
output_language=OUTPUT_LANGUAGE, output_language=OUTPUT_LANGUAGE,
fact_context_block=fact_context_block,
) )
try: try:

Datei anzeigen

@@ -1299,18 +1299,22 @@ class AgentOrchestrator:
except Exception as e: except Exception as e:
logger.warning("Bias-Anreicherung fehlgeschlagen (Pipeline laeuft weiter): %s", e) logger.warning("Bias-Anreicherung fehlgeschlagen (Pipeline laeuft weiter): %s", e)
# --- Analyse-Task --- # --- Analyse-Task (wird nach _do_factcheck mit fact_context_block aufgerufen) ---
async def _do_analysis(): async def _do_analysis(fact_context_block: str = ""):
analyzer = AnalyzerAgent() analyzer = AnalyzerAgent()
if previous_summary and new_count > 0: if previous_summary and new_count > 0:
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
return await analyzer.analyze_incremental( return await analyzer.analyze_incremental(
title, description, new_articles_for_analysis, title, description, new_articles_for_analysis,
previous_summary, previous_sources_json, incident_type, previous_summary, previous_sources_json, incident_type,
fact_context_block=fact_context_block,
) )
else: else:
logger.info("Erstanalyse: Alle Artikel werden analysiert") logger.info("Erstanalyse: Alle Artikel werden analysiert")
return await analyzer.analyze(title, description, all_articles_preloaded, incident_type) return await analyzer.analyze(
title, description, all_articles_preloaded, incident_type,
fact_context_block=fact_context_block,
)
# --- Faktencheck-Task --- # --- Faktencheck-Task ---
async def _do_factcheck(): async def _do_factcheck():
@@ -1344,20 +1348,61 @@ class AgentOrchestrator:
articles_for_check = [dict(row) for row in await cursor.fetchall()] articles_for_check = [dict(row) for row in await cursor.fetchall()]
return await factchecker.check(title, articles_for_check, incident_type) return await factchecker.check(title, articles_for_check, incident_type)
# Pipeline-Schritte 6+7: Lagebild verfassen + Fakten prüfen (Start, parallel) # Pipeline-Schritt 6: Faktencheck zuerst (sequenziell). Liefert den
await _pipe_start("summary") # Faktenkontext fuer das Lagebild, damit dieses auf geprueftem Stand
# schreibt und Unklarheiten explizit benennt. Variante 1: bei
# Faktencheck-Fehler faellt das Lagebild auf den alten Pfad ohne
# Faktenkontext zurueck (Refresh bricht NICHT ab).
await _pipe_start("factcheck") await _pipe_start("factcheck")
factcheck_result: tuple = ([], None)
fact_context_block = ""
factcheck_failed_reason: str | None = None
try:
factcheck_result = await _do_factcheck()
except Exception as fc_err:
factcheck_failed_reason = str(fc_err)
logger.warning(
"Faktencheck fehlgeschlagen, Lagebild laeuft ohne Faktenkontext: %s",
fc_err, exc_info=True,
)
# Beide Tasks PARALLEL starten fact_checks, fc_usage = factcheck_result if factcheck_result else ([], None)
logger.info("Starte Analyse und Faktencheck parallel...")
analysis_result, factcheck_result = await asyncio.gather( # Pipeline-Schritt 6 done direkt nach dem Aufruf — die finale
_do_analysis(), # DB-Persistierung passiert weiter unten, aber fuer die UI ist
_do_factcheck(), # der Faktencheck-Aufruf hier abgeschlossen. Der count_value
# ist eine Schaetzung (echte Zahl steht spaeter in der DB).
_fc_estimated_new = max(0, len(fact_checks or []) - len(existing_facts or []))
await _pipe_done(
"factcheck",
count_value=_fc_estimated_new,
count_secondary=len(fact_checks) if fact_checks else 0,
) )
# Faktenkontext fuer das Lagebild bauen.
try:
from agents.analyzer import build_fact_context_block as _build_fc_ctx
fact_context_block = _build_fc_ctx(
existing_facts or [], fact_checks or [], incident_type,
)
if fact_context_block:
logger.info(
"Faktenkontext fuer Lagebild: %d Zeichen, basierend auf %d alten + %d neuen Fakten",
len(fact_context_block), len(existing_facts or []), len(fact_checks or []),
)
except Exception as ctx_err:
logger.warning("build_fact_context_block fehlgeschlagen: %s", ctx_err, exc_info=True)
fact_context_block = ""
# Pipeline-Schritt 7: Lagebild verfassen (jetzt mit Faktenkontext)
await _pipe_start("summary")
logger.info(
"Starte Lagebild (sequenziell nach Faktencheck%s)",
" — OHNE Faktenkontext (Fallback)" if factcheck_failed_reason else "",
)
analysis_result = await _do_analysis(fact_context_block)
analysis, analysis_usage = analysis_result analysis, analysis_usage = analysis_result
fact_checks, fc_usage = factcheck_result
# Pipeline-Schritt 6: Lagebild verfassen (fertig, keine Zahl, nur Status)
await _pipe_done("summary", count_value=None, count_secondary=None) await _pipe_done("summary", count_value=None, count_secondary=None)
# --- Analyse-Ergebnisse verarbeiten --- # --- Analyse-Ergebnisse verarbeiten ---
@@ -1656,9 +1701,10 @@ class AgentOrchestrator:
await db.commit() await db.commit()
# Pipeline-Schritt 7: Fakten prüfen (fertig) # Pipeline-Schritt 7 (Fakten pruefen) wurde bereits frueher als done
_new_facts_count = max(0, len(fact_checks) - len(existing_facts)) # markiert (siehe weiter oben — direkt nach dem _do_factcheck-Aufruf,
await _pipe_done("factcheck", count_value=_new_facts_count, count_secondary=len(fact_checks) if fact_checks else 0) # bevor das Lagebild generiert wurde). Hier nur noch die DB-
# Persistierung der Fakten, ohne den Step erneut zu schliessen.
# Pipeline-Schritt 8: Qualitätscheck (Start, ohne Zahlen) # Pipeline-Schritt 8: Qualitätscheck (Start, ohne Zahlen)
await _pipe_start("qc") await _pipe_start("qc")