Dateien
AegisSight-Monitor-Verwaltung/src/shared/services/source_suggester.py
Claude 5e08d06784 feat(quellen-health): Strategie-Eskalation, Loesung-suchen bei Warnings, Trend-Delta
Drei zusammenhaengende Verbesserungen am Quellen-Health-Bereich:

1. shared/services/source_suggester.py:
   - sync mit Monitor commit 49c5572.
   - Neue Funktion generate_strategy_escalation_suggestions: erzeugt
     deactivate-Vorschlaege fuer Quellen mit fetch_strategy=googlebot|
     paywall, deren Reachability-Check trotzdem error meldet.

2. source-health.js: Loesung-suchen-Button erweitert.
   Bisher nur bei status=error AND check_type=reachability. Jetzt auch
   bei status=warning AND check_type=feed_validity (z.B. "Feed
   erreichbar aber leer"). Backend-Endpoint /api/sources/health/
   search-fix wird in beiden Faellen aufgerufen, Claude sucht eine
   bessere URL fuer die Quelle.

3. source-health.js: Trend-Delta im Counter.
   Liest healthHistoryCache[1] (vorletzter Run) und vergleicht mit
   aktuellen errors/warnings/ok. Zeigt z.B. "3 Fehler (+2)" rot oder
   "143 Warnungen (-15)" gruen. Bei steigenden ok-Counts ist Plus
   gruen, bei steigenden Fehlern ist Plus rot. Wenn der vorletzte
   Run nicht verfuegbar (Initial-Lauf): kein Delta.

Cache-Buster source-health.js auf 20260509l gebumpt.
2026-05-09 15:26:24 +00:00

459 Zeilen
17 KiB
Python

"""KI-gestützte Quellen-Vorschläge via Haiku + deterministische Karteileichen-Heuristik."""
import json
import logging
import re
import aiosqlite
from agents.claude_client import call_claude
from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.source_suggester")
# Schwelle für "stumm seit": eine Quelle, die seit mehr als so vielen Tagen
# keinen Artikel mehr geliefert hat, gilt als Karteileichen-Kandidat.
STALE_DEACTIVATE_THRESHOLD_DAYS = 60
async def generate_stale_deactivation_suggestions(
db: aiosqlite.Connection,
days_threshold: int = STALE_DEACTIVATE_THRESHOLD_DAYS,
) -> int:
"""Erzeugt deactivate_source-Vorschläge für Karteileichen-Quellen.
Karteileiche = aktive Quelle, die entweder noch nie einen Artikel geliefert hat
(article_count = 0) oder seit mehr als days_threshold Tagen stumm ist
(last_seen_at älter als die Schwelle). Reine SQL-Heuristik, kein KI-Aufruf.
Doppel-Vermeidung: existiert bereits ein pending deactivate-Vorschlag für
dieselbe source_id, wird kein neuer erzeugt.
Returns: Anzahl neu erstellter Vorschläge.
"""
cursor = await db.execute(
f"""
SELECT id, name, url, domain, article_count, last_seen_at
FROM sources
WHERE status = 'active'
AND (
COALESCE(article_count, 0) = 0
OR (last_seen_at IS NOT NULL
AND last_seen_at < datetime('now', '-{int(days_threshold)} days'))
)
"""
)
candidates = [dict(row) for row in await cursor.fetchall()]
if not candidates:
return 0
cursor = await db.execute(
"SELECT DISTINCT source_id FROM source_suggestions "
"WHERE status = 'pending' AND suggestion_type = 'deactivate_source' "
"AND source_id IS NOT NULL"
)
already_pending = {row["source_id"] for row in await cursor.fetchall()}
created = 0
for c in candidates:
sid = c["id"]
if sid in already_pending:
continue
if (c["article_count"] or 0) == 0:
reason = "Hat seit Anlage noch nie einen Artikel geliefert."
else:
reason = (
f"Letzter Artikel vor mehr als {days_threshold} Tagen "
f"(last_seen_at={c['last_seen_at']})."
)
title = f"{c['name']} (ID {sid}) - Karteileiche, deaktivieren?"
description = (
f"Quelle: {c['name']} | URL: {c['url']} | Domain: {c['domain'] or '-'}\n"
f"Begründung: {reason}\n"
f"article_count={c['article_count'] or 0}, "
f"last_seen_at={c['last_seen_at'] or 'NULL'}\n"
"Hinweis: Quelle wurde automatisch als inaktiv erkannt. "
"Bitte vor Annahme prüfen, ob sie wirklich nicht mehr gebraucht wird."
)
suggested_data = json.dumps(
{"action": "deactivate", "source_id": sid}, ensure_ascii=False
)
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, suggested_data, "
" priority, status) VALUES "
"('deactivate_source', ?, ?, ?, ?, 'medium', 'pending')",
(title, description, sid, suggested_data),
)
created += 1
if created > 0:
await db.commit()
logger.info(
"Karteileichen-Heuristik: %d neue deactivate-Vorschläge erstellt "
"(%d Kandidaten, %d bereits pending)",
created, len(candidates), len(already_pending),
)
else:
logger.info(
"Karteileichen-Heuristik: keine neuen Vorschläge "
"(%d Kandidaten, alle bereits pending)",
len(candidates),
)
return created
async def generate_strategy_escalation_suggestions(db: aiosqlite.Connection) -> int:
"""Erzeugt deactivate_source-Vorschläge für Quellen, bei denen die fetch_strategy
bereits eskaliert wurde (googlebot oder paywall) und der Reachability-Check
trotzdem error meldet.
Beispiel: Rheinische Post hat fetch_strategy=googlebot, kriegt aber HTTP 403.
-> Strategie greift nicht, Quelle ist faktisch nicht abrufbar. Vorschlag: deaktivieren.
Doppel-Vermeidung wie in der Karteileichen-Heuristik: nur wenn noch kein pending
deactivate-Vorschlag für die source_id existiert.
Returns: Anzahl neu erstellter Vorschläge.
"""
cursor = await db.execute(
"""
SELECT s.id, s.name, s.url, s.domain, s.fetch_strategy, h.message
FROM sources s
JOIN source_health_checks h ON h.source_id = s.id
WHERE s.status = 'active'
AND s.fetch_strategy IN ('googlebot', 'paywall')
AND h.check_type = 'reachability'
AND h.status = 'error'
"""
)
candidates = [dict(row) for row in await cursor.fetchall()]
if not candidates:
return 0
cursor = await db.execute(
"SELECT DISTINCT source_id FROM source_suggestions "
"WHERE status = 'pending' AND suggestion_type = 'deactivate_source' "
"AND source_id IS NOT NULL"
)
already_pending = {row["source_id"] for row in await cursor.fetchall()}
created = 0
for c in candidates:
sid = c["id"]
if sid in already_pending:
continue
title = f"{c['name']} (ID {sid}) - Strategie greift nicht"
description = (
f"Quelle: {c['name']} | URL: {c['url']} | Domain: {c['domain'] or '-'}\n"
f"fetch_strategy='{c['fetch_strategy']}' wurde bereits zur Eskalation gesetzt, "
f"liefert beim Health-Check aber weiter einen Fehler:\n"
f" {c['message']}\n"
"Vorschlag: deaktivieren oder fetch_strategy='skip' setzen, damit die Quelle "
"den Health-Check nicht weiter verfälscht.\n"
"Hinweis: Quelle wurde automatisch erkannt. Bitte vor Annahme prüfen."
)
suggested_data = json.dumps(
{"action": "deactivate", "source_id": sid,
"reason": "fetch_strategy_failed", "current_strategy": c["fetch_strategy"]},
ensure_ascii=False,
)
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, suggested_data, "
" priority, status) VALUES "
"('deactivate_source', ?, ?, ?, ?, 'high', 'pending')",
(title, description, sid, suggested_data),
)
created += 1
if created > 0:
await db.commit()
logger.info(
"Strategie-Eskalations-Heuristik: %d neue deactivate-Vorschläge "
"(%d Kandidaten, %d bereits pending)",
created, len(candidates), len(already_pending),
)
return created
async def generate_suggestions(db: aiosqlite.Connection) -> int:
"""Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.
Drei Stufen:
1. Deterministisch: Karteileichen-Heuristik (article_count=0 oder >60d stumm)
erzeugt sofort deactivate_source-Vorschläge ohne KI-Aufruf.
2. Deterministisch: Strategie-Eskalations-Heuristik (fetch_strategy=googlebot
oder paywall, aber Reachability weiter error) erzeugt deactivate_source-
Vorschläge mit Priorität 'high'.
3. KI-basiert: Haiku schaut sich Quellensammlung + Health-Probleme an
und schlägt weitere Verbesserungen vor (add_source, deactivate_source,
fix_url, ...).
Rückgabe ist die Gesamtzahl neu erzeugter Vorschläge aller Stufen.
"""
stale_count = await generate_stale_deactivation_suggestions(db)
strategy_count = await generate_strategy_escalation_suggestions(db)
logger.info("Starte Quellen-Vorschläge via Haiku...")
# 1. Aktuelle Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, category, status, "
"article_count, last_seen_at "
"FROM sources WHERE tenant_id IS NULL ORDER BY category, name"
)
sources = [dict(row) for row in await cursor.fetchall()]
# 2. Health-Check-Probleme laden
cursor = await db.execute("""
SELECT h.source_id, s.name, s.domain, s.url,
h.check_type, h.status, h.message
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
WHERE h.status IN ('error', 'warning')
""")
issues = [dict(row) for row in await cursor.fetchall()]
# 3. Alte pending-Vorschläge entfernen (älter als 30 Tage)
await db.execute(
"DELETE FROM source_suggestions "
"WHERE status = 'pending' AND created_at < datetime('now', '-30 days')"
)
# 4. Quellen-Zusammenfassung für Haiku
categories = {}
for s in sources:
cat = s["category"]
if cat not in categories:
categories[cat] = []
categories[cat].append(s)
source_summary = ""
for cat, cat_sources in sorted(categories.items()):
active = [
s for s in cat_sources
if s["status"] == "active" and s["source_type"] != "excluded"
]
source_summary += f"\n{cat} ({len(active)} aktiv): "
source_summary += ", ".join(s["name"] for s in active[:10])
if len(active) > 10:
source_summary += f" ... (+{len(active) - 10} weitere)"
issues_summary = ""
if issues:
issues_summary = "\n\nProbleme gefunden:\n"
for issue in issues[:20]:
issues_summary += (
f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): "
f"{issue['check_type']} = {issue['status']} - {issue['message']}\n"
)
prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden.
Aktuelle Quellensammlung:{source_summary}{issues_summary}
Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor.
Beachte:
1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste
2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor
3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen
4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind
5. Maximal 5 Vorschläge
Antworte NUR mit einem JSON-Array. Jedes Element:
{{
"type": "add_source|deactivate_source|fix_url|remove_source",
"title": "Kurzer Titel",
"description": "Begründung",
"priority": "low|medium|high",
"source_id": null,
"data": {{
"name": "Anzeigename",
"url": "https://...",
"domain": "example.de",
"category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige"
}}
}}
Nur das JSON-Array, kein anderer Text."""
try:
response, usage = await call_claude(
prompt, tools=None, model=CLAUDE_MODEL_FAST,
)
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match:
logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)")
return 0
suggestions = json.loads(json_match.group(0))
count = 0
for suggestion in suggestions[:5]:
stype = suggestion.get("type", "add_source")
title = suggestion.get("title", "")
desc = suggestion.get("description", "")
priority = suggestion.get("priority", "medium")
source_id = suggestion.get("source_id")
data = json.dumps(
suggestion.get("data", {}), ensure_ascii=False,
)
# source_id validieren (muss existieren oder None sein)
if source_id is not None:
cursor = await db.execute(
"SELECT id FROM sources WHERE id = ?", (source_id,),
)
if not await cursor.fetchone():
source_id = None
# Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending?
if source_id is not None:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
(stype, source_id),
)
else:
# Bei add_source ohne source_id: Domain aus suggested_data prüfen
check_domain = suggestion.get('data', {}).get('domain', '')
if check_domain:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'",
(stype, f'%{check_domain}%'),
)
else:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE title = ? AND status = 'pending'",
(title,),
)
if await cursor.fetchone():
continue
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, "
"suggested_data, priority, status) "
"VALUES (?, ?, ?, ?, ?, ?, 'pending')",
(stype, title, desc, source_id, data, priority),
)
count += 1
await db.commit()
logger.info(
f"Quellen-Vorschläge: {count} neue Vorschläge generiert via Haiku "
f"(+{stale_count} Karteileichen, +{strategy_count} Strategie-Eskalation) "
f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / "
f"${usage.cost_usd:.4f})"
)
return count + stale_count + strategy_count
except Exception as e:
logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True)
return stale_count + strategy_count
async def apply_suggestion(
db: aiosqlite.Connection, suggestion_id: int, accept: bool,
) -> dict:
"""Wendet einen Vorschlag an oder lehnt ihn ab."""
cursor = await db.execute(
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,),
)
suggestion = await cursor.fetchone()
if not suggestion:
raise ValueError("Vorschlag nicht gefunden")
suggestion = dict(suggestion)
if suggestion["status"] != "pending":
raise ValueError(f"Vorschlag bereits {suggestion['status']}")
new_status = "accepted" if accept else "rejected"
result = {"status": new_status, "action": None}
if accept:
stype = suggestion["suggestion_type"]
data = (
json.loads(suggestion["suggested_data"])
if suggestion["suggested_data"]
else {}
)
if stype == "add_source":
name = data.get("name", "Unbenannt")
url = data.get("url")
domain = data.get("domain", "")
category = data.get("category", "sonstige")
source_type = "rss_feed" if url and any(
x in (url or "").lower()
for x in ("rss", "feed", "xml", "atom")
) else "web_source"
if url:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
(url,),
)
if await cursor.fetchone():
result["action"] = "übersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
"INSERT INTO sources "
"(name, url, domain, source_type, category, status, "
"added_by, tenant_id) "
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
(name, url, domain, source_type, category),
)
result["action"] = f"Quelle '{name}' angelegt"
else:
result["action"] = "übersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute(
"UPDATE sources SET status = 'inactive' WHERE id = ?",
(source_id,),
)
result["action"] = "Quelle deaktiviert"
else:
result["action"] = "übersprungen (keine source_id)"
elif stype == "remove_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute(
"DELETE FROM sources WHERE id = ?", (source_id,),
)
result["action"] = "Quelle gelöscht"
else:
result["action"] = "übersprungen (keine source_id)"
elif stype == "fix_url":
source_id = suggestion["source_id"]
new_url = data.get("url")
if source_id and new_url:
await db.execute(
"UPDATE sources SET url = ? WHERE id = ?",
(new_url, source_id),
)
result["action"] = f"URL aktualisiert auf {new_url}"
else:
result["action"] = "übersprungen (keine source_id oder URL)"
await db.execute(
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP "
"WHERE id = ?",
(new_status, suggestion_id),
)
await db.commit()
result["status"] = new_status
return result