Inkrementelle Analyse + Token-Optimierung + Relevanz-Scoring

TOKEN-OPTIMIERUNG:
- Inkrementelle Analyse: Folge-Refreshes senden nur noch das bisherige
  Lagebild + neue Artikel an Claude (statt alle Artikel erneut).
  Spart ~60-70% Tokens bei Lagen mit vielen Artikeln.
- Inkrementeller Faktencheck: Bestehende Fakten als Zusammenfassung,
  nur neue Artikel werden vollstaendig geprueft.
- Modell-Steuerung: Feed-Selektion nutzt jetzt Haiku (CLAUDE_MODEL_FAST)
  statt Opus. Spart ~50-70% bei Feed-Auswahl.
- Set-basierte DB-Deduplizierung: Bestehende URLs/Headlines einmal
  in Sets geladen statt N*M einzelne DB-Queries pro Artikel.

INHALTLICHE VERBESSERUNGEN:
- Relevanz-Scoring: Artikel nach Keyword-Dichte (40%),
  Quellen-Reputation (30%), Inhaltstiefe (20%), RSS-Score (10%).
- Flexibles RSS-Matching: min. Haelfte der Keywords statt alle.
  RSS-Artikel bekommen einen relevance_score.
- Fuzzy Claim-Matching: SequenceMatcher (0.7) statt exakter
  String-Vergleich. Verhindert Duplikat-Akkumulation.
- Translation-Fix: Nur gueltige DB-IDs (isinstance int).
- Researcher: WebFetch fuer Top-Artikel, erweiterte Zusammenfassungen.

DATEIEN:
- config.py: CLAUDE_MODEL_FAST
- claude_client.py: model-Parameter
- researcher.py: Haiku Feed-Selektion, erweiterte Prompts
- analyzer.py: Inkrementelle Analyse + analyze_incremental()
- factchecker.py: Inkrementeller Check + Fuzzy-Matching
- orchestrator.py: Set-Dedup, Relevanz-Scoring, inkrementeller Flow
- rss_parser.py: Flexibles Keyword-Matching + relevance_score
Dieser Commit ist enthalten in:
claude-dev
2026-03-04 20:22:47 +01:00
Ursprung 54d02d2c5b
Commit 3d9a827bc8
7 geänderte Dateien mit 541 neuen und 317 gelöschten Zeilen

Datei anzeigen

@@ -9,6 +9,7 @@ from typing import Optional
from urllib.parse import urlparse, urlunparse
from agents.claude_client import UsageAccumulator
from agents.factchecker import find_matching_claim
from source_rules import (
_detect_category,
_extract_domain,
@@ -18,6 +19,17 @@ from source_rules import (
logger = logging.getLogger("osint.orchestrator")
# Reputations-Score nach Quellenkategorie (für Relevanz-Scoring)
CATEGORY_REPUTATION = {
"nachrichten_de": 0.9,
"nachrichten_int": 0.9,
"presseagenturen": 1.0,
"behoerden": 1.0,
"fachmedien": 0.8,
"international": 0.7,
"sonstige": 0.4,
}
def _normalize_url(url: str) -> str:
"""URL normalisieren für Duplikat-Erkennung."""
@@ -76,6 +88,50 @@ def _is_duplicate(article: dict, seen_urls: set, seen_headlines: set) -> bool:
return False
def _score_relevance(article: dict, search_words: list[str] = None) -> float:
"""Berechnet einen Relevanz-Score (0.0-1.0) für einen Artikel.
Gewichtung:
- 40% Keyword-Dichte (wie gut passt der Artikel zum Suchbegriff)
- 30% Quellen-Reputation (basierend auf Kategorie)
- 20% Inhaltstiefe (hat der Artikel substantiellen Inhalt)
- 10% RSS-Score (falls vorhanden, vom RSS-Parser)
"""
score = 0.0
# 1. Keyword-Dichte (40%)
rss_score = article.get("relevance_score", 0.0)
if rss_score > 0:
score += 0.4 * rss_score
elif search_words:
text = f"{article.get('headline', '')} {article.get('content_original', '')}".lower()
match_count = sum(1 for w in search_words if w in text)
score += 0.4 * (match_count / len(search_words)) if search_words else 0.0
# 2. Quellen-Reputation (30%)
source_url = article.get("source_url", "")
if source_url:
domain = _extract_domain(source_url)
category = _detect_category(domain)
score += 0.3 * CATEGORY_REPUTATION.get(category, 0.4)
else:
score += 0.3 * 0.4 # Unbekannte Quelle
# 3. Inhaltstiefe (20%)
content = article.get("content_original") or article.get("content_de") or ""
if len(content) > 500:
score += 0.2
elif len(content) > 200:
score += 0.1
elif len(content) > 50:
score += 0.05
# 4. RSS-Score Bonus (10%)
score += 0.1 * rss_score
return min(1.0, score)
async def _background_discover_sources(articles: list[dict]):
"""Background-Task: Registriert seriöse, unbekannte Quellen aus Recherche-Ergebnissen."""
from database import get_db
@@ -478,6 +534,8 @@ class AgentOrchestrator:
visibility = incident["visibility"] if "visibility" in incident.keys() else "public"
created_by = incident["created_by"] if "created_by" in incident.keys() else None
tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None
previous_summary = incident["summary"] or ""
previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None
# Bei Retry: vorherigen running-Eintrag als error markieren
if retry_count > 0:
@@ -569,6 +627,12 @@ class AgentOrchestrator:
if dupes_removed > 0:
logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend")
# Relevanz-Scoring und Sortierung
for article in unique_results:
if "relevance_score" not in article or article["relevance_score"] == 0:
article["relevance_score"] = _score_relevance(article)
unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True)
source_count = len(set(a.get("source", "") for a in unique_results))
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
@@ -581,43 +645,45 @@ class AgentOrchestrator:
},
}, visibility, created_by, tenant_id)
# In DB speichern (neue Artikel) — auch gegen bestehende DB-Einträge prüfen
# --- Set-basierte DB-Deduplizierung (statt N×M Queries) ---
cursor = await db.execute(
"SELECT id, source_url, headline FROM articles WHERE incident_id = ?",
(incident_id,),
)
existing_db_articles = await cursor.fetchall()
existing_urls = set()
existing_headlines = set()
for row in existing_db_articles:
if row["source_url"]:
existing_urls.add(_normalize_url(row["source_url"]))
if row["headline"] and len(row["headline"]) > 20:
norm_h = _normalize_headline(row["headline"])
if norm_h:
existing_headlines.add(norm_h)
logger.info(f"DB-Dedup: {len(existing_urls)} URLs, {len(existing_headlines)} Headlines im Bestand")
# Neue Artikel speichern und für Analyse tracken
new_count = 0
new_articles_for_analysis = []
for article in unique_results:
# Prüfen ob URL (normalisiert) schon existiert
# URL-Duplikat gegen DB
if article.get("source_url"):
norm_url = _normalize_url(article["source_url"])
cursor = await db.execute(
"SELECT id, source_url FROM articles WHERE incident_id = ?",
(incident_id,),
)
existing_articles = await cursor.fetchall()
already_exists = False
for existing in existing_articles:
if existing["source_url"] and _normalize_url(existing["source_url"]) == norm_url:
already_exists = True
break
if already_exists:
if norm_url in existing_urls:
continue
existing_urls.add(norm_url)
# Headline-Duplikat gegen DB prüfen
# Headline-Duplikat gegen DB
headline = article.get("headline", "")
if headline and len(headline) > 20:
norm_h = _normalize_headline(headline)
cursor = await db.execute(
"SELECT id, headline FROM articles WHERE incident_id = ?",
(incident_id,),
)
existing_articles = await cursor.fetchall()
headline_exists = False
for existing in existing_articles:
if _normalize_headline(existing["headline"]) == norm_h:
headline_exists = True
break
if headline_exists:
if norm_h and norm_h in existing_headlines:
continue
if norm_h:
existing_headlines.add(norm_h)
await db.execute(
cursor = await db.execute(
"""INSERT INTO articles (incident_id, headline, headline_de, source,
source_url, content_original, content_de, language, published_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
@@ -635,6 +701,10 @@ class AgentOrchestrator:
),
)
new_count += 1
# Artikel mit DB-ID für die Analyse tracken
article_with_id = dict(article)
article_with_id["id"] = cursor.lastrowid
new_articles_for_analysis.append(article_with_id)
await db.commit()
@@ -647,23 +717,34 @@ class AgentOrchestrator:
logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}")
# Schritt 3: Analyse und Zusammenfassung
if new_count > 0 or not incident["summary"]:
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles = [dict(row) for row in await cursor.fetchall()]
if new_count > 0 or not previous_summary:
analyzer = AnalyzerAgent()
analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type)
# Inkrementelle Analyse wenn Lagebild bereits existiert und neue Artikel vorhanden
if previous_summary and new_count > 0:
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
analysis, analysis_usage = await analyzer.analyze_incremental(
title, description, new_articles_for_analysis,
previous_summary, previous_sources_json, incident_type,
)
else:
# Erstanalyse: Alle Artikel laden
logger.info("Erstanalyse: Alle Artikel werden analysiert")
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles = [dict(row) for row in await cursor.fetchall()]
analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type)
if analysis_usage:
usage_acc.add(analysis_usage)
if analysis:
is_first_summary = not incident["summary"]
is_first_summary = not previous_summary
# Snapshot des alten Lagebilds sichern (nur wenn schon eins existiert)
if incident["summary"]:
if previous_summary:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident_id,),
@@ -679,7 +760,7 @@ class AgentOrchestrator:
(incident_id, summary, sources_json,
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, incident["summary"], incident["sources_json"],
(incident_id, previous_summary, previous_sources_json,
snap_articles, snap_fcs, log_id, now, tenant_id),
)
@@ -715,13 +796,13 @@ class AgentOrchestrator:
snap_articles, snap_fcs, log_id, now, tenant_id),
)
# Übersetzungen aktualisieren
# Übersetzungen aktualisieren (nur für gültige DB-IDs)
for translation in analysis.get("translations", []):
article_id = translation.get("article_id")
if article_id:
if isinstance(article_id, int):
await db.execute(
"UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ?",
(translation.get("headline_de"), translation.get("content_de"), article_id),
"UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ? AND incident_id = ?",
(translation.get("headline_de"), translation.get("content_de"), article_id, incident_id),
)
await db.commit()
@@ -738,7 +819,29 @@ class AgentOrchestrator:
# Schritt 4: Faktencheck
factchecker = FactCheckerAgent()
fact_checks, fc_usage = await factchecker.check(title, all_articles, incident_type)
# Bestehende Fakten laden für inkrementellen Check
cursor = await db.execute(
"SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
existing_facts = [dict(row) for row in await cursor.fetchall()]
if existing_facts and new_count > 0:
# Inkrementeller Faktencheck: nur neue Artikel + bestehende Fakten
logger.info(f"Inkrementeller Faktencheck: {new_count} neue Artikel, {len(existing_facts)} bestehende Fakten")
fact_checks, fc_usage = await factchecker.check_incremental(
title, new_articles_for_analysis, existing_facts, incident_type,
)
else:
# Erstcheck: alle Artikel
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles_for_fc = [dict(row) for row in await cursor.fetchall()]
fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type)
if fc_usage:
usage_acc.add(fc_usage)
@@ -746,54 +849,52 @@ class AgentOrchestrator:
self._check_cancelled(incident_id)
# Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
row = await cursor.fetchone()
is_first_refresh = row["cnt"] == 0
is_first_refresh = len(existing_facts) == 0
# Notification-Summary sammeln
confirmed_count = 0
contradicted_count = 0
status_changes = []
# Mutable Kopie für Fuzzy-Matching
remaining_existing = list(existing_facts)
for fc in fact_checks:
# Prüfen ob Claim schon existiert (mit altem Status)
cursor = await db.execute(
"SELECT id, status FROM fact_checks WHERE incident_id = ? AND claim = ?",
(incident_id, fc.get("claim", "")),
)
existing = await cursor.fetchone()
old_status = existing["status"] if existing else None
new_claim = fc.get("claim", "")
new_status = fc.get("status", "developing")
if existing:
# Fuzzy-Matching gegen bestehende Claims
matched = find_matching_claim(new_claim, remaining_existing)
if matched:
old_status = matched.get("status")
await db.execute(
"UPDATE fact_checks SET status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?",
(new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, existing["id"]),
"UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?",
(new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, matched["id"]),
)
# Aus der Liste entfernen damit nicht doppelt gematcht wird
remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]]
# Status-Änderung tracken
if not is_first_refresh and old_status and old_status != new_status:
status_changes.append({
"claim": new_claim,
"old_status": old_status,
"new_status": new_status,
})
else:
await db.execute(
"""INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(incident_id, fc.get("claim", ""), new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id),
(incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id),
)
# Status-Statistik sammeln
if new_status == "confirmed" or new_status == "established":
if new_status in ("confirmed", "established"):
confirmed_count += 1
elif new_status == "contradicted" or new_status == "disputed":
elif new_status in ("contradicted", "disputed"):
contradicted_count += 1
# Echte Status-Änderungen tracken (nicht beim ersten Refresh)
if not is_first_refresh and old_status and old_status != new_status:
status_changes.append({
"claim": fc.get("claim", ""),
"old_status": old_status,
"new_status": new_status,
})
await db.commit()
# Gebündelte Notification senden (nicht beim ersten Refresh)