Zwei-Phasen-Faktencheck + parallele Analyse/Faktencheck-Ausführung

- Neuer Zwei-Phasen-Faktencheck: Haiku-Triage identifiziert betroffene Fakten,
  dann parallele Opus-Verifikation pro thematischer Gruppe (max 8 Fakten/Gruppe)
- Analyse und Faktencheck laufen jetzt parallel via asyncio.gather
- Snapshot-Erstellung vor parallele Verarbeitung verschoben
- Fallback auf Standard-Faktencheck bei Triage-Fehler
- Erwartete Verbesserung: ~19 Min -> ~8 Min pro Refresh bei gleichbleibender Qualität

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
claude-dev
2026-03-09 20:16:45 +01:00
Ursprung 984a5a8184
Commit 1f3b3cb858
2 geänderte Dateien mit 833 neuen und 493 gelöschten Zeilen

Datei-Diff unterdrückt, da er zu groß ist Diff laden

Datei anzeigen

@@ -9,7 +9,7 @@ from typing import Optional
from urllib.parse import urlparse, urlunparse
from agents.claude_client import UsageAccumulator
from agents.factchecker import find_matching_claim, deduplicate_new_facts
from agents.factchecker import find_matching_claim, deduplicate_new_facts, TWOPHASE_MIN_FACTS
from source_rules import (
_detect_category,
_extract_domain,
@@ -764,58 +764,108 @@ class AgentOrchestrator:
except Exception as e:
logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}")
# Schritt 3: Analyse und Zusammenfassung
# Schritt 3+4: Analyse und Faktencheck PARALLEL
if new_count > 0 or not previous_summary:
analyzer = AnalyzerAgent()
is_first_summary = not previous_summary
# Inkrementelle Analyse wenn Lagebild bereits existiert und neue Artikel vorhanden
if previous_summary and new_count > 0:
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
analysis, analysis_usage = await analyzer.analyze_incremental(
title, description, new_articles_for_analysis,
previous_summary, previous_sources_json, incident_type,
# Snapshot des alten Lagebilds sichern BEVOR parallele Verarbeitung startet
if previous_summary:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
else:
# Erstanalyse: Alle Artikel laden
logger.info("Erstanalyse: Alle Artikel werden analysiert")
snap_articles = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
snap_fcs = (await cursor.fetchone())["cnt"]
await db.execute(
"""INSERT INTO incident_snapshots
(incident_id, summary, sources_json,
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, previous_summary, previous_sources_json,
snap_articles, snap_fcs, log_id, now, tenant_id),
)
await db.commit()
# Bestehende Fakten und alle Artikel vorladen (für parallele Tasks)
cursor = await db.execute(
"SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
existing_facts = [dict(row) for row in await cursor.fetchall()]
# Alle Artikel vorladen für Erstanalyse/Erstcheck
all_articles_preloaded = None
if not previous_summary or new_count == 0:
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles = [dict(row) for row in await cursor.fetchall()]
analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type)
all_articles_preloaded = [dict(row) for row in await cursor.fetchall()]
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "analyzing", "detail": "Analyse und Faktencheck laufen parallel...", "started_at": now_utc},
}, visibility, created_by, tenant_id)
# --- Analyse-Task ---
async def _do_analysis():
analyzer = AnalyzerAgent()
if previous_summary and new_count > 0:
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
return await analyzer.analyze_incremental(
title, description, new_articles_for_analysis,
previous_summary, previous_sources_json, incident_type,
)
else:
logger.info("Erstanalyse: Alle Artikel werden analysiert")
return await analyzer.analyze(title, description, all_articles_preloaded, incident_type)
# --- Faktencheck-Task ---
async def _do_factcheck():
factchecker = FactCheckerAgent()
if existing_facts and new_count > 0:
if len(existing_facts) >= TWOPHASE_MIN_FACTS:
logger.info(
f"Zwei-Phasen-Faktencheck: {new_count} neue Artikel, "
f"{len(existing_facts)} bestehende Fakten"
)
return await factchecker.check_incremental_twophase(
title, new_articles_for_analysis, existing_facts, incident_type,
)
else:
logger.info(
f"Inkrementeller Faktencheck: {new_count} neue Artikel, "
f"{len(existing_facts)} bestehende Fakten"
)
return await factchecker.check_incremental(
title, new_articles_for_analysis, existing_facts, incident_type,
)
else:
return await factchecker.check(title, all_articles_preloaded or [], incident_type)
# Beide Tasks PARALLEL starten
logger.info("Starte Analyse und Faktencheck parallel...")
analysis_result, factcheck_result = await asyncio.gather(
_do_analysis(),
_do_factcheck(),
)
analysis, analysis_usage = analysis_result
fact_checks, fc_usage = factcheck_result
# --- Analyse-Ergebnisse verarbeiten ---
if analysis_usage:
usage_acc.add(analysis_usage)
if analysis:
is_first_summary = not previous_summary
# Snapshot des alten Lagebilds sichern (nur wenn schon eins existiert)
if previous_summary:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
snap_articles = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
snap_fcs = (await cursor.fetchone())["cnt"]
await db.execute(
"""INSERT INTO incident_snapshots
(incident_id, summary, sources_json,
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, previous_summary, previous_sources_json,
snap_articles, snap_fcs, log_id, now, tenant_id),
)
# sources_json aus der Analyse extrahieren und speichern
sources = analysis.get("sources", [])
sources_json = json.dumps(sources, ensure_ascii=False) if sources else None
new_summary = analysis.get("summary", "")
await db.execute(
@@ -855,50 +905,16 @@ class AgentOrchestrator:
await db.commit()
# Checkpoint 2: Cancel prüfen nach Analyse
# Cancel-Check nach paralleler Verarbeitung
self._check_cancelled(incident_id)
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "factchecking", "detail": "Prüft Fakten gegen unabhängige Quellen...", "started_at": now_utc},
}, visibility, created_by, tenant_id)
# Schritt 4: Faktencheck
factchecker = FactCheckerAgent()
# Bestehende Fakten laden für inkrementellen Check
cursor = await db.execute(
"SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
existing_facts = [dict(row) for row in await cursor.fetchall()]
if existing_facts and new_count > 0:
# Inkrementeller Faktencheck: nur neue Artikel + bestehende Fakten
logger.info(f"Inkrementeller Faktencheck: {new_count} neue Artikel, {len(existing_facts)} bestehende Fakten")
fact_checks, fc_usage = await factchecker.check_incremental(
title, new_articles_for_analysis, existing_facts, incident_type,
)
else:
# Erstcheck: alle Artikel
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles_for_fc = [dict(row) for row in await cursor.fetchall()]
fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type)
# --- Faktencheck-Ergebnisse verarbeiten ---
# Pre-Dedup: Duplikate aus LLM-Antwort entfernen
fact_checks = deduplicate_new_facts(fact_checks)
if fc_usage:
usage_acc.add(fc_usage)
# Checkpoint 3: Cancel prüfen nach Faktencheck
self._check_cancelled(incident_id)
# Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks)
is_first_refresh = len(existing_facts) == 0