Fix: Duplikat-Vorschläge + Stale-Check nur für RSS-Feeds

- Duplikat-Check basiert auf source_id+type statt exaktem Titel
- add_source ohne source_id prüft per Domain-Match
- Stale-Check überspringt web_sources (nur RSS-Feeds prüfen)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
claude-dev
2026-03-08 19:05:45 +01:00
Ursprung 5986d03209
Commit 13143b9447
2 geänderte Dateien mit 290 neuen und 274 gelöschten Zeilen

Datei anzeigen

@@ -1,4 +1,4 @@
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" """Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
import asyncio import asyncio
import logging import logging
import json import json
@@ -12,7 +12,7 @@ logger = logging.getLogger("osint.source_health")
async def run_health_checks(db: aiosqlite.Connection) -> dict: async def run_health_checks(db: aiosqlite.Connection) -> dict:
"""Führt alle Health-Checks für aktive Grundquellen durch.""" """Führt alle Health-Checks für aktive Grundquellen durch."""
logger.info("Starte Quellen-Health-Check...") logger.info("Starte Quellen-Health-Check...")
# Alle aktiven Grundquellen laden # Alle aktiven Grundquellen laden
@@ -22,14 +22,14 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict:
) )
sources = [dict(row) for row in await cursor.fetchall()] sources = [dict(row) for row in await cursor.fetchall()]
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben)
await db.execute("DELETE FROM source_health_checks") await db.execute("DELETE FROM source_health_checks")
await db.commit() await db.commit()
checks_done = 0 checks_done = 0
issues_found = 0 issues_found = 0
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
sources_with_url = [s for s in sources if s["url"]] sources_with_url = [s for s in sources if s["url"]]
async with httpx.AsyncClient( async with httpx.AsyncClient(
@@ -46,7 +46,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict:
if isinstance(result, Exception): if isinstance(result, Exception):
await _save_check( await _save_check(
db, source["id"], "reachability", "error", db, source["id"], "reachability", "error",
f"Prüfung fehlgeschlagen: {result}", f"Prüfung fehlgeschlagen: {result}",
) )
issues_found += 1 issues_found += 1
else: else:
@@ -61,7 +61,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict:
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen) # 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
for source in sources: for source in sources:
if source["source_type"] == "excluded": if source["source_type"] in ("excluded", "web_source"):
continue continue
stale_check = _check_stale(source) stale_check = _check_stale(source)
if stale_check: if stale_check:
@@ -83,7 +83,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict:
await db.commit() await db.commit()
logger.info( logger.info(
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
f"{issues_found} Probleme gefunden" f"{issues_found} Probleme gefunden"
) )
return {"checked": checks_done, "issues": issues_found} return {"checked": checks_done, "issues": issues_found}
@@ -92,7 +92,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict:
async def _check_source_reachability( async def _check_source_reachability(
client: httpx.AsyncClient, source: dict, client: httpx.AsyncClient, source: dict,
) -> list[dict]: ) -> list[dict]:
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" """Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
checks = [] checks = []
url = source["url"] url = source["url"]
@@ -125,14 +125,14 @@ async def _check_source_reachability(
"message": "Erreichbar", "message": "Erreichbar",
}) })
# Feed-Validität nur für RSS-Feeds # Feed-Validität nur für RSS-Feeds
if source["source_type"] == "rss_feed": if source["source_type"] == "rss_feed":
text = resp.text[:20000] text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text: if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({ checks.append({
"type": "feed_validity", "type": "feed_validity",
"status": "error", "status": "error",
"message": "Kein gültiger RSS/Atom-Feed", "message": "Kein gültiger RSS/Atom-Feed",
}) })
else: else:
feed = await asyncio.to_thread(feedparser.parse, text) feed = await asyncio.to_thread(feedparser.parse, text)
@@ -155,7 +155,7 @@ async def _check_source_reachability(
checks.append({ checks.append({
"type": "feed_validity", "type": "feed_validity",
"status": "ok", "status": "ok",
"message": f"Feed gültig ({len(feed.entries)} Einträge)", "message": f"Feed gültig ({len(feed.entries)} Einträge)",
}) })
except httpx.TimeoutException: except httpx.TimeoutException:
@@ -181,7 +181,7 @@ async def _check_source_reachability(
def _check_stale(source: dict) -> dict | None: def _check_stale(source: dict) -> dict | None:
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
if source["source_type"] == "excluded": if source["source_type"] == "excluded":
return None return None
@@ -249,7 +249,7 @@ async def _save_check(
async def get_health_summary(db: aiosqlite.Connection) -> dict: async def get_health_summary(db: aiosqlite.Connection) -> dict:
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute(""" cursor = await db.execute("""
SELECT SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type, h.id, h.source_id, s.name, s.domain, s.url, s.source_type,

Datei anzeigen

@@ -1,261 +1,277 @@
"""KI-gestützte Quellen-Vorschläge via Haiku.""" """KI-gestützte Quellen-Vorschläge via Haiku."""
import json import json
import logging import logging
import re import re
import aiosqlite import aiosqlite
from agents.claude_client import call_claude from agents.claude_client import call_claude
from config import CLAUDE_MODEL_FAST from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.source_suggester") logger = logging.getLogger("osint.source_suggester")
async def generate_suggestions(db: aiosqlite.Connection) -> int: async def generate_suggestions(db: aiosqlite.Connection) -> int:
"""Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse."""
logger.info("Starte Quellen-Vorschläge via Haiku...") logger.info("Starte Quellen-Vorschläge via Haiku...")
# 1. Aktuelle Quellen laden # 1. Aktuelle Quellen laden
cursor = await db.execute( cursor = await db.execute(
"SELECT id, name, url, domain, source_type, category, status, " "SELECT id, name, url, domain, source_type, category, status, "
"article_count, last_seen_at " "article_count, last_seen_at "
"FROM sources WHERE tenant_id IS NULL ORDER BY category, name" "FROM sources WHERE tenant_id IS NULL ORDER BY category, name"
) )
sources = [dict(row) for row in await cursor.fetchall()] sources = [dict(row) for row in await cursor.fetchall()]
# 2. Health-Check-Probleme laden # 2. Health-Check-Probleme laden
cursor = await db.execute(""" cursor = await db.execute("""
SELECT h.source_id, s.name, s.domain, s.url, SELECT h.source_id, s.name, s.domain, s.url,
h.check_type, h.status, h.message h.check_type, h.status, h.message
FROM source_health_checks h FROM source_health_checks h
JOIN sources s ON s.id = h.source_id JOIN sources s ON s.id = h.source_id
WHERE h.status IN ('error', 'warning') WHERE h.status IN ('error', 'warning')
""") """)
issues = [dict(row) for row in await cursor.fetchall()] issues = [dict(row) for row in await cursor.fetchall()]
# 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage)
await db.execute( await db.execute(
"DELETE FROM source_suggestions " "DELETE FROM source_suggestions "
"WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')"
) )
# 4. Quellen-Zusammenfassung für Haiku # 4. Quellen-Zusammenfassung für Haiku
categories = {} categories = {}
for s in sources: for s in sources:
cat = s["category"] cat = s["category"]
if cat not in categories: if cat not in categories:
categories[cat] = [] categories[cat] = []
categories[cat].append(s) categories[cat].append(s)
source_summary = "" source_summary = ""
for cat, cat_sources in sorted(categories.items()): for cat, cat_sources in sorted(categories.items()):
active = [ active = [
s for s in cat_sources s for s in cat_sources
if s["status"] == "active" and s["source_type"] != "excluded" if s["status"] == "active" and s["source_type"] != "excluded"
] ]
source_summary += f"\n{cat} ({len(active)} aktiv): " source_summary += f"\n{cat} ({len(active)} aktiv): "
source_summary += ", ".join(s["name"] for s in active[:10]) source_summary += ", ".join(s["name"] for s in active[:10])
if len(active) > 10: if len(active) > 10:
source_summary += f" ... (+{len(active) - 10} weitere)" source_summary += f" ... (+{len(active) - 10} weitere)"
issues_summary = "" issues_summary = ""
if issues: if issues:
issues_summary = "\n\nProbleme gefunden:\n" issues_summary = "\n\nProbleme gefunden:\n"
for issue in issues[:20]: for issue in issues[:20]:
issues_summary += ( issues_summary += (
f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): "
f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" f"{issue['check_type']} = {issue['status']} - {issue['message']}\n"
) )
prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden.
Aktuelle Quellensammlung:{source_summary}{issues_summary} Aktuelle Quellensammlung:{source_summary}{issues_summary}
Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor.
Beachte: Beachte:
1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste 1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste
2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor 2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor
3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen 3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen
4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind 4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind
5. Maximal 5 Vorschläge 5. Maximal 5 Vorschläge
Antworte NUR mit einem JSON-Array. Jedes Element: Antworte NUR mit einem JSON-Array. Jedes Element:
{{ {{
"type": "add_source|deactivate_source|fix_url|remove_source", "type": "add_source|deactivate_source|fix_url|remove_source",
"title": "Kurzer Titel", "title": "Kurzer Titel",
"description": "Begründung", "description": "Begründung",
"priority": "low|medium|high", "priority": "low|medium|high",
"source_id": null, "source_id": null,
"data": {{ "data": {{
"name": "Anzeigename", "name": "Anzeigename",
"url": "https://...", "url": "https://...",
"domain": "example.de", "domain": "example.de",
"category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige"
}} }}
}} }}
Nur das JSON-Array, kein anderer Text.""" Nur das JSON-Array, kein anderer Text."""
try: try:
response, usage = await call_claude( response, usage = await call_claude(
prompt, tools=None, model=CLAUDE_MODEL_FAST, prompt, tools=None, model=CLAUDE_MODEL_FAST,
) )
json_match = re.search(r'\[.*\]', response, re.DOTALL) json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match: if not json_match:
logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)")
return 0 return 0
suggestions = json.loads(json_match.group(0)) suggestions = json.loads(json_match.group(0))
count = 0 count = 0
for suggestion in suggestions[:5]: for suggestion in suggestions[:5]:
stype = suggestion.get("type", "add_source") stype = suggestion.get("type", "add_source")
title = suggestion.get("title", "") title = suggestion.get("title", "")
desc = suggestion.get("description", "") desc = suggestion.get("description", "")
priority = suggestion.get("priority", "medium") priority = suggestion.get("priority", "medium")
source_id = suggestion.get("source_id") source_id = suggestion.get("source_id")
data = json.dumps( data = json.dumps(
suggestion.get("data", {}), ensure_ascii=False, suggestion.get("data", {}), ensure_ascii=False,
) )
# source_id validieren (muss existieren oder None sein) # source_id validieren (muss existieren oder None sein)
if source_id is not None: if source_id is not None:
cursor = await db.execute( cursor = await db.execute(
"SELECT id FROM sources WHERE id = ?", (source_id,), "SELECT id FROM sources WHERE id = ?", (source_id,),
) )
if not await cursor.fetchone(): if not await cursor.fetchone():
source_id = None source_id = None
# Duplikat-Check # Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending?
cursor = await db.execute( if source_id is not None:
"SELECT id FROM source_suggestions " cursor = await db.execute(
"WHERE title = ? AND status = 'pending'", "SELECT id FROM source_suggestions "
(title,), "WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
) (stype, source_id),
if await cursor.fetchone(): )
continue else:
# Bei add_source ohne source_id: Domain aus suggested_data prüfen
await db.execute( check_domain = suggestion.get('data', {}).get('domain', '')
"INSERT INTO source_suggestions " if check_domain:
"(suggestion_type, title, description, source_id, " cursor = await db.execute(
"suggested_data, priority, status) " "SELECT id FROM source_suggestions "
"VALUES (?, ?, ?, ?, ?, ?, 'pending')", "WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'",
(stype, title, desc, source_id, data, priority), (stype, f'%{check_domain}%'),
) )
count += 1 else:
cursor = await db.execute(
await db.commit() "SELECT id FROM source_suggestions "
logger.info( "WHERE title = ? AND status = 'pending'",
f"Quellen-Vorschläge: {count} neue Vorschläge generiert " (title,),
f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " )
f"${usage.cost_usd:.4f})" if await cursor.fetchone():
) continue
return count
await db.execute(
except Exception as e: "INSERT INTO source_suggestions "
logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) "(suggestion_type, title, description, source_id, "
return 0 "suggested_data, priority, status) "
"VALUES (?, ?, ?, ?, ?, ?, 'pending')",
(stype, title, desc, source_id, data, priority),
async def apply_suggestion( )
db: aiosqlite.Connection, suggestion_id: int, accept: bool, count += 1
) -> dict:
"""Wendet einen Vorschlag an oder lehnt ihn ab.""" await db.commit()
cursor = await db.execute( logger.info(
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), f"Quellen-Vorschläge: {count} neue Vorschläge generiert "
) f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / "
suggestion = await cursor.fetchone() f"${usage.cost_usd:.4f})"
if not suggestion: )
raise ValueError("Vorschlag nicht gefunden") return count
suggestion = dict(suggestion) except Exception as e:
logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True)
if suggestion["status"] != "pending": return 0
raise ValueError(f"Vorschlag bereits {suggestion['status']}")
new_status = "accepted" if accept else "rejected" async def apply_suggestion(
result = {"status": new_status, "action": None} db: aiosqlite.Connection, suggestion_id: int, accept: bool,
) -> dict:
if accept: """Wendet einen Vorschlag an oder lehnt ihn ab."""
stype = suggestion["suggestion_type"] cursor = await db.execute(
data = ( "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,),
json.loads(suggestion["suggested_data"]) )
if suggestion["suggested_data"] suggestion = await cursor.fetchone()
else {} if not suggestion:
) raise ValueError("Vorschlag nicht gefunden")
if stype == "add_source": suggestion = dict(suggestion)
name = data.get("name", "Unbenannt")
url = data.get("url") if suggestion["status"] != "pending":
domain = data.get("domain", "") raise ValueError(f"Vorschlag bereits {suggestion['status']}")
category = data.get("category", "sonstige")
source_type = "rss_feed" if url and any( new_status = "accepted" if accept else "rejected"
x in (url or "").lower() result = {"status": new_status, "action": None}
for x in ("rss", "feed", "xml", "atom")
) else "web_source" if accept:
stype = suggestion["suggestion_type"]
if url: data = (
cursor = await db.execute( json.loads(suggestion["suggested_data"])
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", if suggestion["suggested_data"]
(url,), else {}
) )
if await cursor.fetchone():
result["action"] = "übersprungen (URL bereits vorhanden)" if stype == "add_source":
new_status = "rejected" name = data.get("name", "Unbenannt")
else: url = data.get("url")
await db.execute( domain = data.get("domain", "")
"INSERT INTO sources " category = data.get("category", "sonstige")
"(name, url, domain, source_type, category, status, " source_type = "rss_feed" if url and any(
"added_by, tenant_id) " x in (url or "").lower()
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", for x in ("rss", "feed", "xml", "atom")
(name, url, domain, source_type, category), ) else "web_source"
)
result["action"] = f"Quelle '{name}' angelegt" if url:
else: cursor = await db.execute(
result["action"] = "übersprungen (keine URL)" "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
new_status = "rejected" (url,),
)
elif stype == "deactivate_source": if await cursor.fetchone():
source_id = suggestion["source_id"] result["action"] = "übersprungen (URL bereits vorhanden)"
if source_id: new_status = "rejected"
await db.execute( else:
"UPDATE sources SET status = 'inactive' WHERE id = ?", await db.execute(
(source_id,), "INSERT INTO sources "
) "(name, url, domain, source_type, category, status, "
result["action"] = "Quelle deaktiviert" "added_by, tenant_id) "
else: "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
result["action"] = "übersprungen (keine source_id)" (name, url, domain, source_type, category),
)
elif stype == "remove_source": result["action"] = f"Quelle '{name}' angelegt"
source_id = suggestion["source_id"] else:
if source_id: result["action"] = "übersprungen (keine URL)"
await db.execute( new_status = "rejected"
"DELETE FROM sources WHERE id = ?", (source_id,),
) elif stype == "deactivate_source":
result["action"] = "Quelle gelöscht" source_id = suggestion["source_id"]
else: if source_id:
result["action"] = "übersprungen (keine source_id)" await db.execute(
"UPDATE sources SET status = 'inactive' WHERE id = ?",
elif stype == "fix_url": (source_id,),
source_id = suggestion["source_id"] )
new_url = data.get("url") result["action"] = "Quelle deaktiviert"
if source_id and new_url: else:
await db.execute( result["action"] = "übersprungen (keine source_id)"
"UPDATE sources SET url = ? WHERE id = ?",
(new_url, source_id), elif stype == "remove_source":
) source_id = suggestion["source_id"]
result["action"] = f"URL aktualisiert auf {new_url}" if source_id:
else: await db.execute(
result["action"] = "übersprungen (keine source_id oder URL)" "DELETE FROM sources WHERE id = ?", (source_id,),
)
await db.execute( result["action"] = "Quelle gelöscht"
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " else:
"WHERE id = ?", result["action"] = "übersprungen (keine source_id)"
(new_status, suggestion_id),
) elif stype == "fix_url":
await db.commit() source_id = suggestion["source_id"]
new_url = data.get("url")
result["status"] = new_status if source_id and new_url:
return result await db.execute(
"UPDATE sources SET url = ? WHERE id = ?",
(new_url, source_id),
)
result["action"] = f"URL aktualisiert auf {new_url}"
else:
result["action"] = "übersprungen (keine source_id oder URL)"
await db.execute(
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP "
"WHERE id = ?",
(new_status, suggestion_id),
)
await db.commit()
result["status"] = new_status
return result