Ursache: Claude liefert teilweise Quellennummern als String statt Integer. Der Frontend-Vergleich (===) schlug dann fehl: "574" !== 574. Fixes: - 95 String-Nummern in Irankonflikt sources_json zu Integer konvertiert - 5 Duplikate entfernt - Frontend: Number() statt parseInt/=== fuer robusten Vergleich - Orchestrator: Automatische Konvertierung von String-Nr zu Integer vor DB-Speicherung - Cache-Buster aktualisiert Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1247 Zeilen
58 KiB
Python
1247 Zeilen
58 KiB
Python
"""Agenten-Orchestrierung: Queue und Steuerung der Claude-Agenten."""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from config import TIMEZONE
|
|
from typing import Optional
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
from agents.claude_client import UsageAccumulator
|
|
from agents.factchecker import find_matching_claim, deduplicate_new_facts, TWOPHASE_MIN_FACTS
|
|
from source_rules import (
|
|
_detect_category,
|
|
_extract_domain,
|
|
discover_source,
|
|
domain_to_display_name,
|
|
)
|
|
|
|
logger = logging.getLogger("osint.orchestrator")
|
|
|
|
# Reputations-Score nach Quellenkategorie (für Relevanz-Scoring)
|
|
CATEGORY_REPUTATION = {
|
|
"nachrichten_de": 0.9,
|
|
"nachrichten_int": 0.9,
|
|
"presseagenturen": 1.0,
|
|
"behoerden": 1.0,
|
|
"fachmedien": 0.8,
|
|
"international": 0.7,
|
|
"sonstige": 0.4,
|
|
}
|
|
|
|
|
|
def _normalize_url(url: str) -> str:
|
|
"""URL normalisieren für Duplikat-Erkennung."""
|
|
if not url:
|
|
return ""
|
|
url = url.strip()
|
|
try:
|
|
parsed = urlparse(url)
|
|
# Scheme normalisieren
|
|
scheme = parsed.scheme.lower() or "https"
|
|
# Host normalisieren (www entfernen, lowercase)
|
|
netloc = parsed.netloc.lower()
|
|
if netloc.startswith("www."):
|
|
netloc = netloc[4:]
|
|
# Pfad normalisieren (trailing slash entfernen)
|
|
path = parsed.path.rstrip("/")
|
|
# Query-Parameter und Fragment entfernen (Tracking-Params etc.)
|
|
return urlunparse((scheme, netloc, path, "", "", ""))
|
|
except Exception:
|
|
return url.lower().strip().rstrip("/")
|
|
|
|
|
|
def _normalize_headline(headline: str) -> str:
|
|
"""Überschrift normalisieren für Ähnlichkeitsvergleich."""
|
|
if not headline:
|
|
return ""
|
|
h = headline.lower().strip()
|
|
# Umlaute normalisieren
|
|
h = h.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss")
|
|
# Sonderzeichen entfernen
|
|
h = re.sub(r"[^\w\s]", "", h)
|
|
h = re.sub(r"\s+", " ", h).strip()
|
|
return h
|
|
|
|
|
|
def _is_duplicate(article: dict, seen_urls: set, seen_headlines: set) -> bool:
|
|
"""Prüft ob ein Artikel ein Duplikat ist (URL oder Headline)."""
|
|
url = article.get("source_url", "")
|
|
headline = article.get("headline", "")
|
|
|
|
# URL-Duplikat
|
|
if url:
|
|
norm_url = _normalize_url(url)
|
|
if norm_url in seen_urls:
|
|
return True
|
|
seen_urls.add(norm_url)
|
|
|
|
# Headline-Duplikat (nur wenn Überschrift lang genug)
|
|
if headline and len(headline) > 20:
|
|
norm_headline = _normalize_headline(headline)
|
|
if norm_headline and norm_headline in seen_headlines:
|
|
return True
|
|
if norm_headline:
|
|
seen_headlines.add(norm_headline)
|
|
|
|
return False
|
|
|
|
|
|
def _score_relevance(article: dict, search_words: list[str] = None) -> float:
|
|
"""Berechnet einen Relevanz-Score (0.0-1.0) für einen Artikel.
|
|
|
|
Gewichtung:
|
|
- 40% Keyword-Dichte (wie gut passt der Artikel zum Suchbegriff)
|
|
- 30% Quellen-Reputation (basierend auf Kategorie)
|
|
- 20% Inhaltstiefe (hat der Artikel substantiellen Inhalt)
|
|
- 10% RSS-Score (falls vorhanden, vom RSS-Parser)
|
|
"""
|
|
score = 0.0
|
|
|
|
# 1. Keyword-Dichte (40%)
|
|
rss_score = article.get("relevance_score", 0.0)
|
|
if rss_score > 0:
|
|
score += 0.4 * rss_score
|
|
elif search_words:
|
|
text = f"{article.get('headline', '')} {article.get('content_original', '')}".lower()
|
|
match_count = sum(1 for w in search_words if w in text)
|
|
score += 0.4 * (match_count / len(search_words)) if search_words else 0.0
|
|
|
|
# 2. Quellen-Reputation (30%)
|
|
source_url = article.get("source_url", "")
|
|
if source_url:
|
|
domain = _extract_domain(source_url)
|
|
category = _detect_category(domain)
|
|
score += 0.3 * CATEGORY_REPUTATION.get(category, 0.4)
|
|
else:
|
|
score += 0.3 * 0.4 # Unbekannte Quelle
|
|
|
|
# 3. Inhaltstiefe (20%)
|
|
content = article.get("content_original") or article.get("content_de") or ""
|
|
if len(content) > 500:
|
|
score += 0.2
|
|
elif len(content) > 200:
|
|
score += 0.1
|
|
elif len(content) > 50:
|
|
score += 0.05
|
|
|
|
# 4. RSS-Score Bonus (10%)
|
|
score += 0.1 * rss_score
|
|
|
|
return min(1.0, score)
|
|
|
|
|
|
async def _background_discover_sources(articles: list[dict]):
|
|
"""Background-Task: Registriert seriöse, unbekannte Quellen aus Recherche-Ergebnissen."""
|
|
from database import get_db
|
|
|
|
db = await get_db()
|
|
try:
|
|
# 1. Unique Domains extrahieren
|
|
seen_domains: set[str] = set()
|
|
domains_to_check: list[tuple[str, str, str]] = []
|
|
for article in articles:
|
|
url = article.get("source_url")
|
|
if not url:
|
|
continue
|
|
domain = _extract_domain(url)
|
|
if not domain or domain in seen_domains:
|
|
continue
|
|
seen_domains.add(domain)
|
|
|
|
# 2. Nur seriöse Domains (in DOMAIN_CATEGORY_MAP, nicht "sonstige")
|
|
category = _detect_category(domain)
|
|
if category == "sonstige":
|
|
continue
|
|
domains_to_check.append((domain, url, category))
|
|
|
|
if not domains_to_check:
|
|
return
|
|
|
|
# 3. Gegen DB prüfen — welche Domains existieren schon?
|
|
new_count = 0
|
|
for domain, url, category in domains_to_check:
|
|
cursor = await db.execute(
|
|
"SELECT id FROM sources WHERE LOWER(domain) = ?",
|
|
(domain.lower(),),
|
|
)
|
|
if await cursor.fetchone():
|
|
continue # Domain schon bekannt
|
|
|
|
# 4. RSS-Feed-Erkennung
|
|
try:
|
|
result = await discover_source(url)
|
|
name = domain_to_display_name(domain)
|
|
source_type = result["source_type"] # "rss_feed" oder "web_source"
|
|
feed_url = result.get("rss_url")
|
|
|
|
await db.execute(
|
|
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by)
|
|
VALUES (?, ?, ?, ?, ?, 'active', 'Auto-entdeckt via Recherche', 'system')""",
|
|
(name, feed_url or f"https://{domain}", domain, source_type, category),
|
|
)
|
|
new_count += 1
|
|
logger.info(f"Neue Quelle registriert: {name} ({domain}) als {source_type}")
|
|
except Exception as e:
|
|
logger.debug(f"Discovery fehlgeschlagen für {domain}: {e}")
|
|
|
|
if new_count > 0:
|
|
await db.commit()
|
|
logger.info(f"Background-Discovery: {new_count} neue Quellen registriert")
|
|
except Exception as e:
|
|
logger.warning(f"Background-Discovery Fehler: {e}")
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def _create_notifications_for_incident(
|
|
db, incident_id: int, visibility: str, created_by: int, tenant_id: int, notifications: list[dict]
|
|
):
|
|
"""Erzeugt DB-Notifications fuer alle betroffenen Nutzer der Organisation.
|
|
|
|
- Oeffentliche Lagen -> alle Nutzer der Org
|
|
- Private Lagen -> nur Ersteller
|
|
"""
|
|
if not notifications:
|
|
return
|
|
|
|
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
if visibility == "public" and tenant_id:
|
|
cursor = await db.execute(
|
|
"SELECT id FROM users WHERE organization_id = ? AND is_active = 1 AND last_login_at IS NOT NULL",
|
|
(tenant_id,),
|
|
)
|
|
user_ids = [row["id"] for row in await cursor.fetchall()]
|
|
else:
|
|
user_ids = [created_by]
|
|
|
|
for user_id in user_ids:
|
|
for notif in notifications:
|
|
await db.execute(
|
|
"""INSERT INTO notifications (user_id, incident_id, type, title, text, icon, tenant_id, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
user_id,
|
|
incident_id,
|
|
notif.get("type", "refresh_summary"),
|
|
notif["title"],
|
|
notif["text"],
|
|
notif.get("icon", "info"),
|
|
tenant_id,
|
|
now,
|
|
),
|
|
)
|
|
|
|
await db.commit()
|
|
logger.info(f"Notifications erstellt: {len(notifications)} x {len(user_ids)} Nutzer fuer Lage {incident_id}")
|
|
|
|
|
|
async def _send_email_notifications_for_incident(
|
|
db, incident_id: int, incident_title: str, visibility: str,
|
|
created_by: int, tenant_id: int, notifications: list[dict]
|
|
):
|
|
"""Sendet E-Mail-Benachrichtigungen basierend auf individuellen Nutzer-Abos.
|
|
|
|
Jeder Nutzer hat eigene E-Mail-Praeferenzen pro Lage (incident_subscriptions).
|
|
Nur Nutzer die aktiv sind und sich mindestens einmal eingeloggt haben
|
|
(last_login_at IS NOT NULL) werden beruecksichtigt.
|
|
"""
|
|
if not notifications:
|
|
return
|
|
|
|
from email_utils.sender import send_email
|
|
from email_utils.templates import incident_notification_email
|
|
from config import MAGIC_LINK_BASE_URL
|
|
|
|
# Alle Nutzer mit aktiven Abos fuer diese Lage laden
|
|
cursor = await db.execute(
|
|
"""SELECT s.notify_email_summary, s.notify_email_new_articles,
|
|
s.notify_email_status_change, u.id, u.email, u.username
|
|
FROM incident_subscriptions s
|
|
JOIN users u ON u.id = s.user_id
|
|
WHERE s.incident_id = ?
|
|
AND u.is_active = 1
|
|
AND u.last_login_at IS NOT NULL
|
|
AND (s.notify_email_summary = 1
|
|
OR s.notify_email_new_articles = 1
|
|
OR s.notify_email_status_change = 1)""",
|
|
(incident_id,),
|
|
)
|
|
subscribers = await cursor.fetchall()
|
|
if not subscribers:
|
|
return
|
|
|
|
dashboard_url = f"{MAGIC_LINK_BASE_URL}/dashboard"
|
|
|
|
for sub in subscribers:
|
|
prefs = dict(sub)
|
|
|
|
# Relevante Notifications basierend auf Nutzer-Praeferenzen filtern
|
|
filtered_notifications = []
|
|
for n in notifications:
|
|
ntype = n.get("type", "refresh_summary")
|
|
|
|
if ntype == "refresh_summary" and prefs.get("notify_email_summary"):
|
|
filtered_notifications.append(n)
|
|
elif ntype == "new_articles" and prefs.get("notify_email_new_articles"):
|
|
filtered_notifications.append(n)
|
|
elif ntype == "status_change" and prefs.get("notify_email_status_change"):
|
|
filtered_notifications.append(n)
|
|
|
|
if not filtered_notifications:
|
|
continue
|
|
|
|
subject, html = incident_notification_email(
|
|
username=prefs["email"].split("@")[0],
|
|
incident_title=incident_title,
|
|
notifications=filtered_notifications,
|
|
dashboard_url=dashboard_url,
|
|
)
|
|
try:
|
|
await send_email(prefs["email"], subject, html)
|
|
logger.info(f"E-Mail-Benachrichtigung gesendet an {prefs['email']} fuer Lage {incident_id} ({len(filtered_notifications)} Items)")
|
|
except Exception as e:
|
|
logger.error(f"E-Mail-Benachrichtigung fehlgeschlagen fuer {prefs['email']}: {e}")
|
|
|
|
|
|
class AgentOrchestrator:
|
|
"""Verwaltet die Claude-Agenten-Queue und koordiniert Recherche-Zyklen."""
|
|
|
|
def __init__(self):
|
|
self._queue: asyncio.Queue = asyncio.Queue()
|
|
self._running = False
|
|
self._current_task: Optional[int] = None
|
|
self._ws_manager = None
|
|
self._queued_ids: set[int] = set()
|
|
self._cancel_requested: set[int] = set()
|
|
|
|
def set_ws_manager(self, ws_manager):
|
|
"""WebSocket-Manager setzen für Echtzeit-Updates."""
|
|
self._ws_manager = ws_manager
|
|
|
|
async def start(self):
|
|
"""Queue-Worker starten."""
|
|
self._running = True
|
|
asyncio.create_task(self._worker())
|
|
logger.info("Agenten-Orchestrator gestartet")
|
|
|
|
async def stop(self):
|
|
"""Queue-Worker stoppen."""
|
|
self._running = False
|
|
logger.info("Agenten-Orchestrator gestoppt")
|
|
|
|
async def enqueue_refresh(self, incident_id: int, trigger_type: str = "manual", user_id: int = None) -> bool:
|
|
"""Refresh-Auftrag in die Queue stellen. Gibt False zurueck wenn bereits in Queue/aktiv."""
|
|
if incident_id in self._queued_ids or self._current_task == incident_id:
|
|
logger.info(f"Refresh fuer Lage {incident_id} uebersprungen: bereits aktiv/in Queue")
|
|
return False
|
|
|
|
visibility, created_by, tenant_id = await self._get_incident_visibility(incident_id)
|
|
|
|
self._queued_ids.add(incident_id)
|
|
await self._queue.put((incident_id, trigger_type, user_id))
|
|
queue_size = self._queue.qsize()
|
|
logger.info(f"Refresh fuer Lage {incident_id} eingereiht (Queue: {queue_size}, Trigger: {trigger_type})")
|
|
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {"status": "queued", "queue_position": queue_size},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
return True
|
|
|
|
async def cancel_refresh(self, incident_id: int) -> bool:
|
|
"""Fordert Abbruch eines laufenden Refreshes an."""
|
|
if self._current_task != incident_id:
|
|
return False
|
|
self._cancel_requested.add(incident_id)
|
|
logger.info(f"Cancel angefordert fuer Lage {incident_id}")
|
|
if self._ws_manager:
|
|
try:
|
|
vis, cb, tid = await self._get_incident_visibility(incident_id)
|
|
except Exception:
|
|
vis, cb, tid = "public", None, None
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {"status": "cancelling", "detail": "Wird abgebrochen..."},
|
|
}, vis, cb, tid)
|
|
return True
|
|
|
|
def _check_cancelled(self, incident_id: int):
|
|
"""Prüft ob Abbruch angefordert wurde und wirft CancelledError."""
|
|
if incident_id in self._cancel_requested:
|
|
self._cancel_requested.discard(incident_id)
|
|
raise asyncio.CancelledError("Vom Nutzer abgebrochen")
|
|
|
|
async def _worker(self):
|
|
"""Verarbeitet Refresh-Aufträge sequentiell."""
|
|
while self._running:
|
|
try:
|
|
item = await asyncio.wait_for(self._queue.get(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
if len(item) == 3:
|
|
incident_id, trigger_type, user_id = item
|
|
else:
|
|
incident_id, trigger_type = item
|
|
user_id = None
|
|
self._queued_ids.discard(incident_id)
|
|
self._current_task = incident_id
|
|
logger.info(f"Starte Refresh für Lage {incident_id} (Trigger: {trigger_type})")
|
|
|
|
RETRY_DELAYS = [0, 120, 300] # Sekunden: sofort, 2min, 5min
|
|
TRANSIENT_ERRORS = (asyncio.TimeoutError, TimeoutError, ConnectionError, OSError)
|
|
last_error = None
|
|
|
|
try:
|
|
for attempt in range(3):
|
|
try:
|
|
await self._run_refresh(incident_id, trigger_type=trigger_type, retry_count=attempt, user_id=user_id)
|
|
last_error = None
|
|
break # Erfolg
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Refresh fuer Lage {incident_id} abgebrochen")
|
|
await self._mark_refresh_cancelled(incident_id)
|
|
try:
|
|
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
|
|
except Exception:
|
|
_vis, _cb, _tid = "public", None, None
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "refresh_cancelled",
|
|
"incident_id": incident_id,
|
|
"data": {"status": "cancelled"},
|
|
}, _vis, _cb, _tid)
|
|
last_error = None
|
|
break
|
|
except TRANSIENT_ERRORS as e:
|
|
last_error = e
|
|
logger.warning(f"Transienter Fehler bei Lage {incident_id} (Versuch {attempt + 1}/3): {e}")
|
|
if attempt < 2:
|
|
await self._mark_refresh_failed(incident_id, str(e))
|
|
delay = RETRY_DELAYS[attempt + 1]
|
|
logger.info(f"Retry in {delay}s für Lage {incident_id}")
|
|
# Retry-Status per WebSocket senden
|
|
if self._ws_manager:
|
|
try:
|
|
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
|
|
except Exception:
|
|
_vis, _cb, _tid = "public", None, None
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {"status": "retrying", "attempt": attempt + 1, "delay": delay},
|
|
}, _vis, _cb, _tid)
|
|
await asyncio.sleep(delay)
|
|
else:
|
|
await self._mark_refresh_failed(incident_id, f"Endgültig fehlgeschlagen nach 3 Versuchen: {e}")
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.error(f"Permanenter Fehler bei Refresh für Lage {incident_id}: {e}")
|
|
await self._mark_refresh_failed(incident_id, str(e))
|
|
break # Permanenter Fehler, kein Retry
|
|
|
|
if last_error and self._ws_manager:
|
|
try:
|
|
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
|
|
except Exception:
|
|
_vis, _cb, _tid = "public", None, None
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "refresh_error",
|
|
"incident_id": incident_id,
|
|
"data": {"error": str(last_error)},
|
|
}, _vis, _cb, _tid)
|
|
finally:
|
|
self._current_task = None
|
|
self._queue.task_done()
|
|
|
|
async def _mark_refresh_cancelled(self, incident_id: int):
|
|
"""Markiert den laufenden Refresh-Log-Eintrag als cancelled."""
|
|
from database import get_db
|
|
db = await get_db()
|
|
try:
|
|
await db.execute(
|
|
"""UPDATE refresh_log SET status = 'cancelled', error_message = 'Vom Nutzer abgebrochen',
|
|
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
|
|
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id),
|
|
)
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.warning(f"Konnte Refresh-Log nicht als abgebrochen markieren: {e}")
|
|
finally:
|
|
await db.close()
|
|
|
|
async def _mark_refresh_failed(self, incident_id: int, error: str):
|
|
"""Markiert den laufenden Refresh-Log-Eintrag als error."""
|
|
from database import get_db
|
|
db = await get_db()
|
|
try:
|
|
await db.execute(
|
|
"""UPDATE refresh_log SET status = 'error', error_message = ?,
|
|
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
|
|
(error[:500], datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id),
|
|
)
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.warning(f"Konnte Refresh-Log nicht als fehlgeschlagen markieren: {e}")
|
|
finally:
|
|
await db.close()
|
|
|
|
async def _get_incident_visibility(self, incident_id: int) -> tuple[str, Optional[int], Optional[int]]:
|
|
"""Incident-Visibility, created_by und tenant_id laden."""
|
|
from database import get_db
|
|
visibility = "public"
|
|
created_by = None
|
|
tenant_id = None
|
|
db = await get_db()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT visibility, created_by, tenant_id FROM incidents WHERE id = ?", (incident_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
visibility = row["visibility"] or "public"
|
|
created_by = row["created_by"]
|
|
tenant_id = row["tenant_id"]
|
|
finally:
|
|
await db.close()
|
|
return visibility, created_by, tenant_id
|
|
|
|
async def _run_refresh(self, incident_id: int, trigger_type: str = "manual", retry_count: int = 0, user_id: int = None):
|
|
"""Führt einen kompletten Refresh-Zyklus durch."""
|
|
import aiosqlite
|
|
from database import get_db
|
|
from agents.researcher import ResearcherAgent
|
|
from agents.analyzer import AnalyzerAgent
|
|
from agents.factchecker import FactCheckerAgent
|
|
from feeds.rss_parser import RSSParser
|
|
|
|
db = await get_db()
|
|
try:
|
|
# Lage laden
|
|
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
|
|
incident = await cursor.fetchone()
|
|
if not incident:
|
|
logger.warning(f"Lage {incident_id} nicht gefunden")
|
|
return
|
|
|
|
title = incident["title"]
|
|
description = incident["description"] or ""
|
|
incident_type = incident["type"] or "adhoc"
|
|
international = bool(incident["international_sources"]) if "international_sources" in incident.keys() else True
|
|
include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False
|
|
visibility = incident["visibility"] if "visibility" in incident.keys() else "public"
|
|
created_by = incident["created_by"] if "created_by" in incident.keys() else None
|
|
tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None
|
|
previous_summary = incident["summary"] or ""
|
|
previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None
|
|
|
|
# Bei Retry: vorherigen running-Eintrag als error markieren
|
|
if retry_count > 0:
|
|
await db.execute(
|
|
"""UPDATE refresh_log SET status = 'error', error_message = 'Retry gestartet',
|
|
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
|
|
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id),
|
|
)
|
|
await db.commit()
|
|
|
|
# Refresh-Log starten
|
|
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
cursor = await db.execute(
|
|
"INSERT INTO refresh_log (incident_id, started_at, status, trigger_type, retry_count, tenant_id) VALUES (?, ?, 'running', ?, ?, ?)",
|
|
(incident_id, now, trigger_type, retry_count, tenant_id),
|
|
)
|
|
await db.commit()
|
|
log_id = cursor.lastrowid
|
|
usage_acc = UsageAccumulator()
|
|
|
|
research_status = "deep_researching" if incident_type == "research" else "researching"
|
|
research_detail = "Hintergrundrecherche im Web läuft..." if incident_type == "research" else "RSS-Feeds und Web werden durchsucht..."
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {"status": research_status, "detail": research_detail, "started_at": now_utc},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
# Bestehende Artikel vorladen (für Dedup UND Kontext)
|
|
cursor = await db.execute(
|
|
"SELECT id, source_url, headline, source FROM articles WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
existing_db_articles_full = await cursor.fetchall()
|
|
|
|
# Schritt 1+2: RSS-Feeds und Claude-Recherche parallel ausführen
|
|
async def _rss_pipeline():
|
|
"""RSS-Feed-Suche (Feed-Selektion + dynamische Keywords + Parsing)."""
|
|
if incident_type != "adhoc":
|
|
logger.info("Recherche-Modus: RSS-Feeds übersprungen")
|
|
return [], None
|
|
|
|
rss_researcher = ResearcherAgent()
|
|
rss_parser = RSSParser()
|
|
|
|
from source_rules import get_feeds_with_metadata
|
|
all_feeds = await get_feeds_with_metadata(tenant_id=tenant_id)
|
|
|
|
# Dynamische Keywords aus den letzten Headlines extrahieren
|
|
cursor_hl = await db.execute(
|
|
"""SELECT COALESCE(headline_de, headline) as hl
|
|
FROM articles WHERE incident_id = ?
|
|
AND COALESCE(headline_de, headline) IS NOT NULL
|
|
ORDER BY collected_at DESC LIMIT 30""",
|
|
(incident_id,),
|
|
)
|
|
recent_headlines = [row["hl"] for row in await cursor_hl.fetchall() if row["hl"]]
|
|
dynamic_keywords, kw_usage = await rss_researcher.extract_dynamic_keywords(title, recent_headlines)
|
|
if kw_usage:
|
|
usage_acc.add(kw_usage)
|
|
|
|
feed_usage = None
|
|
keywords = dynamic_keywords # Dynamische Keywords bevorzugen
|
|
if len(all_feeds) > 20:
|
|
selected_feeds, feed_sel_keywords, feed_usage = await rss_researcher.select_relevant_feeds(
|
|
title, description, international, all_feeds
|
|
)
|
|
logger.info(f"Feed-Selektion: {len(selected_feeds)} von {len(all_feeds)} Feeds ausgewählt")
|
|
# Feed-Selektion-Keywords nur als Fallback wenn dynamische fehlen
|
|
if not keywords:
|
|
keywords = feed_sel_keywords
|
|
articles = await rss_parser.search_feeds_selective(title, selected_feeds, keywords=keywords)
|
|
else:
|
|
articles = await rss_parser.search_feeds(title, international=international, tenant_id=tenant_id, keywords=keywords, user_id=user_id)
|
|
|
|
logger.info(f"RSS: {len(articles)} relevante Artikel gefunden (international={international})")
|
|
return articles, feed_usage
|
|
|
|
async def _web_search_pipeline():
|
|
"""Claude WebSearch-Recherche."""
|
|
researcher = ResearcherAgent()
|
|
# Bestehende Artikel als Kontext mitgeben (Research + Adhoc)
|
|
existing_for_context = None
|
|
if existing_db_articles_full:
|
|
existing_for_context = [
|
|
{"source": row["source"] if "source" in row.keys() else "",
|
|
"headline": row["headline"],
|
|
"source_url": row["source_url"]}
|
|
for row in existing_db_articles_full
|
|
]
|
|
results, usage = await researcher.search(
|
|
title, description, incident_type,
|
|
international=international, user_id=user_id,
|
|
existing_articles=existing_for_context,
|
|
)
|
|
logger.info(f"Claude-Recherche: {len(results)} Ergebnisse")
|
|
return results, usage
|
|
|
|
async def _telegram_pipeline():
|
|
"""Telegram-Kanal-Suche mit KI-basierter Kanal-Selektion."""
|
|
from feeds.telegram_parser import TelegramParser
|
|
tg_parser = TelegramParser()
|
|
|
|
# Alle Telegram-Kanaele laden
|
|
all_channels = await tg_parser._get_telegram_channels(tenant_id=tenant_id)
|
|
if not all_channels:
|
|
logger.info("Keine Telegram-Kanaele konfiguriert")
|
|
return [], None
|
|
|
|
# KI waehlt relevante Kanaele aus
|
|
tg_researcher = ResearcherAgent()
|
|
selected_channels, tg_sel_usage = await tg_researcher.select_relevant_telegram_channels(
|
|
title, description, all_channels
|
|
)
|
|
if tg_sel_usage:
|
|
usage_acc.add(tg_sel_usage)
|
|
|
|
selected_ids = [ch["id"] for ch in selected_channels]
|
|
logger.info(f"Telegram-Selektion: {len(selected_ids)} von {len(all_channels)} Kanaelen")
|
|
|
|
# Dynamische Keywords fuer Telegram (eigener Aufruf, da parallel zu RSS)
|
|
cursor_tg_hl = await db.execute(
|
|
"""SELECT COALESCE(headline_de, headline) as hl
|
|
FROM articles WHERE incident_id = ?
|
|
AND COALESCE(headline_de, headline) IS NOT NULL
|
|
ORDER BY collected_at DESC LIMIT 30""",
|
|
(incident_id,),
|
|
)
|
|
tg_headlines = [row["hl"] for row in await cursor_tg_hl.fetchall() if row["hl"]]
|
|
tg_keywords, tg_kw_usage = await tg_researcher.extract_dynamic_keywords(title, tg_headlines)
|
|
if tg_kw_usage:
|
|
usage_acc.add(tg_kw_usage)
|
|
logger.info(f"Telegram-Keywords: {tg_keywords}")
|
|
|
|
articles = await tg_parser.search_channels(title, tenant_id=tenant_id, keywords=tg_keywords, channel_ids=selected_ids)
|
|
logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten")
|
|
return articles, None
|
|
|
|
# Pipelines parallel starten (RSS + WebSearch + optional Telegram)
|
|
pipelines = [_rss_pipeline(), _web_search_pipeline()]
|
|
if include_telegram:
|
|
pipelines.append(_telegram_pipeline())
|
|
|
|
pipeline_results = await asyncio.gather(*pipelines)
|
|
|
|
(rss_articles, rss_feed_usage) = pipeline_results[0]
|
|
(search_results, search_usage) = pipeline_results[1]
|
|
telegram_articles = pipeline_results[2][0] if include_telegram else []
|
|
|
|
if rss_feed_usage:
|
|
usage_acc.add(rss_feed_usage)
|
|
if search_usage:
|
|
usage_acc.add(search_usage)
|
|
|
|
# Checkpoint 1: Cancel prüfen nach RSS/WebSearch
|
|
self._check_cancelled(incident_id)
|
|
|
|
# Alle Ergebnisse zusammenführen
|
|
all_results = rss_articles + search_results + telegram_articles
|
|
|
|
# Duplikate entfernen (normalisierte URL + Headline-Ähnlichkeit)
|
|
seen_urls = set()
|
|
seen_headlines = set()
|
|
unique_results = []
|
|
for article in all_results:
|
|
if not _is_duplicate(article, seen_urls, seen_headlines):
|
|
unique_results.append(article)
|
|
|
|
dupes_removed = len(all_results) - len(unique_results)
|
|
if dupes_removed > 0:
|
|
logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend")
|
|
|
|
# Relevanz-Scoring und Sortierung
|
|
for article in unique_results:
|
|
if "relevance_score" not in article or article["relevance_score"] == 0:
|
|
article["relevance_score"] = _score_relevance(article)
|
|
unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True)
|
|
|
|
source_count = len(set(a.get("source", "") for a in unique_results))
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {
|
|
"status": "analyzing",
|
|
"detail": f"Analysiert {len(unique_results)} Meldungen aus {source_count} Quellen...",
|
|
"started_at": now,
|
|
},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
# --- Set-basierte DB-Deduplizierung (statt N×M Queries) ---
|
|
# existing_db_articles_full wurde bereits oben geladen
|
|
existing_urls = set()
|
|
existing_headlines = set()
|
|
for row in existing_db_articles_full:
|
|
if row["source_url"]:
|
|
existing_urls.add(_normalize_url(row["source_url"]))
|
|
if row["headline"] and len(row["headline"]) > 20:
|
|
norm_h = _normalize_headline(row["headline"])
|
|
if norm_h:
|
|
existing_headlines.add(norm_h)
|
|
|
|
logger.info(f"DB-Dedup: {len(existing_urls)} URLs, {len(existing_headlines)} Headlines im Bestand")
|
|
|
|
# Neue Artikel speichern und für Analyse tracken
|
|
new_count = 0
|
|
new_articles_for_analysis = []
|
|
for article in unique_results:
|
|
# URL-Duplikat gegen DB
|
|
if article.get("source_url"):
|
|
norm_url = _normalize_url(article["source_url"])
|
|
if norm_url in existing_urls:
|
|
continue
|
|
existing_urls.add(norm_url)
|
|
|
|
# Headline-Duplikat gegen DB
|
|
headline = article.get("headline", "")
|
|
if headline and len(headline) > 20:
|
|
norm_h = _normalize_headline(headline)
|
|
if norm_h and norm_h in existing_headlines:
|
|
continue
|
|
if norm_h:
|
|
existing_headlines.add(norm_h)
|
|
|
|
cursor = await db.execute(
|
|
"""INSERT INTO articles (incident_id, headline, headline_de, source,
|
|
source_url, content_original, content_de, language, published_at, tenant_id)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
incident_id,
|
|
article.get("headline", ""),
|
|
article.get("headline_de"),
|
|
article.get("source", "Unbekannt"),
|
|
article.get("source_url"),
|
|
article.get("content_original"),
|
|
article.get("content_de"),
|
|
article.get("language", "de"),
|
|
article.get("published_at"),
|
|
tenant_id,
|
|
),
|
|
)
|
|
new_count += 1
|
|
# Artikel mit DB-ID für die Analyse tracken
|
|
article_with_id = dict(article)
|
|
article_with_id["id"] = cursor.lastrowid
|
|
new_articles_for_analysis.append(article_with_id)
|
|
|
|
await db.commit()
|
|
|
|
# Geoparsing: Orte aus neuen Artikeln extrahieren und speichern
|
|
if new_articles_for_analysis:
|
|
try:
|
|
from agents.geoparsing import geoparse_articles
|
|
incident_context = f"{title} - {description}"
|
|
logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...")
|
|
geo_results, category_labels = await geoparse_articles(new_articles_for_analysis, incident_context)
|
|
geo_count = 0
|
|
for art_id, locations in geo_results.items():
|
|
for loc in locations:
|
|
await db.execute(
|
|
"""INSERT INTO article_locations
|
|
(article_id, incident_id, location_name, location_name_normalized,
|
|
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
|
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
|
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
|
|
)
|
|
geo_count += 1
|
|
if geo_count > 0:
|
|
await db.commit()
|
|
logger.info(f"Geoparsing: {geo_count} Orte aus {len(geo_results)} Artikeln gespeichert")
|
|
# Category-Labels in Incident speichern (nur wenn neu generiert)
|
|
if category_labels:
|
|
import json as _json
|
|
await db.execute(
|
|
"UPDATE incidents SET category_labels = ? WHERE id = ? AND category_labels IS NULL",
|
|
(_json.dumps(category_labels, ensure_ascii=False), incident_id),
|
|
)
|
|
await db.commit()
|
|
logger.info(f"Category-Labels gespeichert fuer Incident {incident_id}: {category_labels}")
|
|
except Exception as e:
|
|
logger.warning(f"Geoparsing fehlgeschlagen (Pipeline laeuft weiter): {e}")
|
|
|
|
# Quellen-Statistiken aktualisieren
|
|
if new_count > 0:
|
|
try:
|
|
from database import refresh_source_counts
|
|
await refresh_source_counts(db)
|
|
except Exception as e:
|
|
logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}")
|
|
|
|
# Schritt 3+4: Analyse und Faktencheck PARALLEL
|
|
if new_count > 0 or not previous_summary:
|
|
is_first_summary = not previous_summary
|
|
|
|
# Snapshot des alten Lagebilds sichern BEVOR parallele Verarbeitung startet
|
|
if previous_summary:
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
snap_articles = (await cursor.fetchone())["cnt"]
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
snap_fcs = (await cursor.fetchone())["cnt"]
|
|
await db.execute(
|
|
"""INSERT INTO incident_snapshots
|
|
(incident_id, summary, sources_json,
|
|
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(incident_id, previous_summary, previous_sources_json,
|
|
snap_articles, snap_fcs, log_id, now, tenant_id),
|
|
)
|
|
await db.commit()
|
|
|
|
# Bestehende Fakten und alle Artikel vorladen (für parallele Tasks)
|
|
cursor = await db.execute(
|
|
"SELECT id, claim, status, sources_count, evidence FROM fact_checks WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
existing_facts = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
# Alle Artikel vorladen für Erstanalyse/Erstcheck
|
|
all_articles_preloaded = None
|
|
if not previous_summary or new_count == 0:
|
|
cursor = await db.execute(
|
|
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
|
|
(incident_id,),
|
|
)
|
|
all_articles_preloaded = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "status_update",
|
|
"incident_id": incident_id,
|
|
"data": {"status": "analyzing", "detail": "Analyse und Faktencheck laufen parallel...", "started_at": now_utc},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
# Quelleneinordnung (Bias) an Artikel anhaengen
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT name, domain, bias FROM sources WHERE bias IS NOT NULL"
|
|
)
|
|
_bias_rows = await cursor.fetchall()
|
|
_bias_by_domain = {}
|
|
_bias_by_name = {}
|
|
for br in _bias_rows:
|
|
brd = dict(br)
|
|
if brd.get("domain"):
|
|
_bias_by_domain[brd["domain"].lower()] = brd["bias"]
|
|
if brd.get("name"):
|
|
_bias_by_name[brd["name"].lower()] = brd["bias"]
|
|
|
|
def _enrich_bias(articles_list):
|
|
if not articles_list:
|
|
return
|
|
for art in articles_list:
|
|
if art.get("source_bias"):
|
|
continue
|
|
src = (art.get("source") or "").lower()
|
|
url = (art.get("source_url") or "").lower()
|
|
# Match by name
|
|
bias = _bias_by_name.get(src)
|
|
if not bias:
|
|
# Match by domain in URL
|
|
for dom, b in _bias_by_domain.items():
|
|
if dom and dom in url:
|
|
bias = b
|
|
break
|
|
if bias:
|
|
art["source_bias"] = bias
|
|
|
|
_enrich_bias(new_articles_for_analysis)
|
|
_enrich_bias(all_articles_preloaded)
|
|
except Exception as e:
|
|
logger.warning("Bias-Anreicherung fehlgeschlagen (Pipeline laeuft weiter): %s", e)
|
|
|
|
# --- Analyse-Task ---
|
|
async def _do_analysis():
|
|
analyzer = AnalyzerAgent()
|
|
if previous_summary and new_count > 0:
|
|
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
|
|
return await analyzer.analyze_incremental(
|
|
title, description, new_articles_for_analysis,
|
|
previous_summary, previous_sources_json, incident_type,
|
|
)
|
|
else:
|
|
logger.info("Erstanalyse: Alle Artikel werden analysiert")
|
|
return await analyzer.analyze(title, description, all_articles_preloaded, incident_type)
|
|
|
|
# --- Faktencheck-Task ---
|
|
async def _do_factcheck():
|
|
factchecker = FactCheckerAgent()
|
|
if existing_facts and new_count > 0:
|
|
if len(existing_facts) >= TWOPHASE_MIN_FACTS:
|
|
logger.info(
|
|
f"Zwei-Phasen-Faktencheck: {new_count} neue Artikel, "
|
|
f"{len(existing_facts)} bestehende Fakten"
|
|
)
|
|
return await factchecker.check_incremental_twophase(
|
|
title, new_articles_for_analysis, existing_facts, incident_type,
|
|
)
|
|
else:
|
|
logger.info(
|
|
f"Inkrementeller Faktencheck: {new_count} neue Artikel, "
|
|
f"{len(existing_facts)} bestehende Fakten"
|
|
)
|
|
return await factchecker.check_incremental(
|
|
title, new_articles_for_analysis, existing_facts, incident_type,
|
|
)
|
|
else:
|
|
return await factchecker.check(title, all_articles_preloaded or [], incident_type)
|
|
|
|
# Beide Tasks PARALLEL starten
|
|
logger.info("Starte Analyse und Faktencheck parallel...")
|
|
analysis_result, factcheck_result = await asyncio.gather(
|
|
_do_analysis(),
|
|
_do_factcheck(),
|
|
)
|
|
|
|
analysis, analysis_usage = analysis_result
|
|
fact_checks, fc_usage = factcheck_result
|
|
|
|
# --- Analyse-Ergebnisse verarbeiten ---
|
|
if analysis_usage:
|
|
usage_acc.add(analysis_usage)
|
|
|
|
if analysis:
|
|
sources = analysis.get("sources", [])
|
|
new_summary = analysis.get("summary", "") or previous_summary
|
|
|
|
# Validierung: Fehlende Quellennummern im Summary erkennen und reparieren
|
|
if sources and new_summary:
|
|
import re as _re
|
|
referenced_nrs = set(int(m) for m in _re.findall(r'\[(\d+)\]', new_summary))
|
|
defined_nrs = set()
|
|
for s in sources:
|
|
try:
|
|
defined_nrs.add(int(s.get("nr", 0)))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
missing_nrs = sorted(referenced_nrs - defined_nrs)
|
|
if missing_nrs:
|
|
logger.warning(
|
|
"Incident %d: %d Quellennummern im Summary ohne Eintrag in sources: %s",
|
|
incident_id, len(missing_nrs), missing_nrs[:20]
|
|
)
|
|
# Platzhalter einfuegen damit die Nummern nicht unverlinkt bleiben
|
|
for nr in missing_nrs:
|
|
sources.append({"nr": nr, "name": "Quelle", "url": ""})
|
|
logger.info("Platzhalter fuer fehlende Quelle [%d] eingefuegt", nr)
|
|
sources.sort(key=lambda s: int(s.get("nr", 0)) if isinstance(s.get("nr"), int) or (isinstance(s.get("nr"), str) and str(s.get("nr", "")).isdigit()) else 9999)
|
|
|
|
# Sicherstellen dass alle nr-Werte Integer sind (Claude liefert manchmal Strings)
|
|
if sources:
|
|
for s in sources:
|
|
nr = s.get("nr")
|
|
if isinstance(nr, str):
|
|
try:
|
|
s["nr"] = int(nr)
|
|
except ValueError:
|
|
pass
|
|
sources_json = json.dumps(sources, ensure_ascii=False) if sources else previous_sources_json
|
|
|
|
await db.execute(
|
|
"UPDATE incidents SET summary = ?, sources_json = ?, updated_at = ? WHERE id = ?",
|
|
(new_summary, sources_json, now, incident_id),
|
|
)
|
|
|
|
# Beim ersten Refresh: Snapshot des neuen Lagebilds erstellen
|
|
if is_first_summary and new_summary:
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
snap_articles = (await cursor.fetchone())["cnt"]
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
snap_fcs = (await cursor.fetchone())["cnt"]
|
|
await db.execute(
|
|
"""INSERT INTO incident_snapshots
|
|
(incident_id, summary, sources_json,
|
|
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(incident_id, new_summary, sources_json,
|
|
snap_articles, snap_fcs, log_id, now, tenant_id),
|
|
)
|
|
|
|
# Übersetzungen aktualisieren (nur für gültige DB-IDs)
|
|
for translation in analysis.get("translations", []):
|
|
article_id = translation.get("article_id")
|
|
if isinstance(article_id, int):
|
|
await db.execute(
|
|
"UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ? AND incident_id = ?",
|
|
(translation.get("headline_de"), translation.get("content_de"), article_id, incident_id),
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
# Cancel-Check nach paralleler Verarbeitung
|
|
self._check_cancelled(incident_id)
|
|
|
|
# --- Faktencheck-Ergebnisse verarbeiten ---
|
|
# Pre-Dedup: Duplikate aus LLM-Antwort entfernen
|
|
fact_checks = deduplicate_new_facts(fact_checks)
|
|
|
|
if fc_usage:
|
|
usage_acc.add(fc_usage)
|
|
|
|
# Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks)
|
|
is_first_refresh = len(existing_facts) == 0
|
|
|
|
# Notification-Summary sammeln
|
|
confirmed_count = 0
|
|
contradicted_count = 0
|
|
status_changes = []
|
|
|
|
# Mutable Kopie für Fuzzy-Matching
|
|
remaining_existing = list(existing_facts)
|
|
|
|
for fc in fact_checks:
|
|
new_claim = fc.get("claim", "")
|
|
new_status = fc.get("status", "developing")
|
|
|
|
# Fuzzy-Matching gegen bestehende Claims
|
|
matched = find_matching_claim(new_claim, remaining_existing)
|
|
|
|
if matched:
|
|
old_status = matched.get("status")
|
|
# status_history aktualisieren bei Statusaenderung
|
|
history_update = ""
|
|
if old_status and old_status != new_status:
|
|
import json as _json
|
|
cursor_hist = await db.execute(
|
|
"SELECT status_history FROM fact_checks WHERE id = ?",
|
|
(matched["id"],),
|
|
)
|
|
hist_row = await cursor_hist.fetchone()
|
|
try:
|
|
history = _json.loads(hist_row[0] or "[]") if hist_row else []
|
|
except (ValueError, TypeError):
|
|
history = []
|
|
history.append({"status": new_status, "at": now})
|
|
history_update = _json.dumps(history)
|
|
# Evidence: Alte URLs beibehalten wenn neue keine hat
|
|
new_evidence = fc.get("evidence") or ""
|
|
import re as _re
|
|
if not _re.search(r"https?://", new_evidence) and matched.get("evidence"):
|
|
old_evidence = matched["evidence"] or ""
|
|
if _re.search(r"https?://", old_evidence):
|
|
bracket_match = _re.search(r"\[(?:Quellen|Weitere Quellen|Ursprungsquellen):.*?\]", old_evidence)
|
|
if bracket_match:
|
|
new_evidence = new_evidence.rstrip(". ") + ". " + bracket_match.group()
|
|
else:
|
|
new_evidence = old_evidence
|
|
|
|
await db.execute(
|
|
"UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ?"
|
|
+ (", status_history = ?" if history_update else "")
|
|
+ " WHERE id = ?",
|
|
(new_claim, new_status, fc.get("sources_count", 0), new_evidence, fc.get("is_notification", 0), now)
|
|
+ ((history_update,) if history_update else ())
|
|
+ (matched["id"],),
|
|
)
|
|
# Aus der Liste entfernen damit nicht doppelt gematcht wird
|
|
remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]]
|
|
|
|
# Status-Änderung tracken
|
|
if not is_first_refresh and old_status and old_status != new_status:
|
|
status_changes.append({
|
|
"claim": new_claim,
|
|
"old_status": old_status,
|
|
"new_status": new_status,
|
|
})
|
|
else:
|
|
import json as _json
|
|
initial_history = _json.dumps([{"status": new_status, "at": now}])
|
|
await db.execute(
|
|
"""INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id, status_history)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id, initial_history),
|
|
)
|
|
|
|
# Status-Statistik sammeln
|
|
if new_status in ("confirmed", "established"):
|
|
confirmed_count += 1
|
|
elif new_status in ("contradicted", "disputed"):
|
|
contradicted_count += 1
|
|
|
|
await db.commit()
|
|
|
|
# Post-Refresh Quality Check: Duplikate und Karten-Kategorien pruefen
|
|
try:
|
|
from services.post_refresh_qc import run_post_refresh_qc
|
|
qc_result = await run_post_refresh_qc(db, incident_id)
|
|
if qc_result.get("facts_removed", 0) > 0 or qc_result.get("locations_fixed", 0) > 0:
|
|
logger.info(
|
|
f"QC: {qc_result['facts_removed']} Duplikate, "
|
|
f"{qc_result['locations_fixed']} Location-Fixes"
|
|
)
|
|
except Exception as qc_err:
|
|
logger.warning(f"Post-Refresh QC fehlgeschlagen: {qc_err}")
|
|
# Gebündelte Notification senden (nicht beim ersten Refresh)
|
|
if not is_first_refresh:
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "refresh_summary",
|
|
"incident_id": incident_id,
|
|
"data": {
|
|
"new_articles": new_count,
|
|
"confirmed_count": confirmed_count,
|
|
"contradicted_count": contradicted_count,
|
|
"status_changes": status_changes,
|
|
"is_first_refresh": False,
|
|
"incident_title": title,
|
|
},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
# DB-Notifications erzeugen
|
|
parts = []
|
|
if new_count > 0:
|
|
parts.append(f"{new_count} neue Meldung{'en' if new_count != 1 else ''}")
|
|
if confirmed_count > 0:
|
|
parts.append(f"{confirmed_count} bestätigt")
|
|
if contradicted_count > 0:
|
|
parts.append(f"{contradicted_count} widersprochen")
|
|
summary_text = ", ".join(parts) if parts else "Keine neuen Entwicklungen"
|
|
|
|
db_notifications = [{
|
|
"type": "refresh_summary",
|
|
"title": title,
|
|
"text": f"Recherche: {summary_text}",
|
|
"icon": "warning" if contradicted_count > 0 else "success",
|
|
}]
|
|
if new_count > 0:
|
|
db_notifications.append({
|
|
"type": "new_articles",
|
|
"title": title,
|
|
"text": f"{new_count} neue Meldung{'en' if new_count != 1 else ''} gefunden",
|
|
"icon": "info",
|
|
})
|
|
for sc in status_changes:
|
|
db_notifications.append({
|
|
"type": "status_change",
|
|
"title": title,
|
|
"text": f"{sc['claim']}: {sc['old_status']} \u2192 {sc['new_status']}",
|
|
"icon": "error" if sc["new_status"] in ("contradicted", "disputed") else "success",
|
|
})
|
|
|
|
if created_by:
|
|
await _create_notifications_for_incident(
|
|
db, incident_id, visibility, created_by, tenant_id, db_notifications
|
|
)
|
|
# E-Mail-Benachrichtigungen versenden
|
|
await _send_email_notifications_for_incident(
|
|
db, incident_id, title, visibility, created_by, tenant_id, db_notifications
|
|
)
|
|
|
|
# Refresh-Log abschließen (mit Token-Statistiken)
|
|
await db.execute(
|
|
"""UPDATE refresh_log SET
|
|
completed_at = ?, articles_found = ?, status = 'completed',
|
|
input_tokens = ?, output_tokens = ?,
|
|
cache_creation_tokens = ?, cache_read_tokens = ?,
|
|
total_cost_usd = ?, api_calls = ?
|
|
WHERE id = ?""",
|
|
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), new_count,
|
|
usage_acc.input_tokens, usage_acc.output_tokens,
|
|
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
|
round(usage_acc.total_cost_usd, 7), usage_acc.call_count, log_id),
|
|
)
|
|
await db.commit()
|
|
logger.info(
|
|
f"Token: {usage_acc.input_tokens} in / {usage_acc.output_tokens} out / "
|
|
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
|
|
)
|
|
|
|
# Quellen-Discovery im Background starten
|
|
if unique_results:
|
|
asyncio.create_task(_background_discover_sources(unique_results))
|
|
|
|
if self._ws_manager:
|
|
await self._ws_manager.broadcast_for_incident({
|
|
"type": "refresh_complete",
|
|
"incident_id": incident_id,
|
|
"data": {"new_articles": new_count, "status": "idle"},
|
|
}, visibility, created_by, tenant_id)
|
|
|
|
logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel")
|
|
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
# Singleton-Instanz
|
|
orchestrator = AgentOrchestrator()
|