diff --git a/migrations/2026-05-09c_source_health_schema.py b/migrations/2026-05-09c_source_health_schema.py new file mode 100644 index 0000000..561f175 --- /dev/null +++ b/migrations/2026-05-09c_source_health_schema.py @@ -0,0 +1,65 @@ +"""Migration 2026-05-09c: source_health_checks und source_suggestions Schema. + +Diese DDL stand bislang inline in routers/sources.py (in /health/run, /health/run-stream, +/health/search-fix). Phase 1 zieht sie hier raus, damit die Endpoints kein DDL mehr ausführen. + +Ausführung: + DB_PATH=/home/claude-dev/osint-data/osint.db python3 migrations/2026-05-09c_source_health_schema.py + DB_PATH=/home/claude-dev/AegisSight-Monitor-staging/data/osint.db python3 migrations/2026-05-09c_source_health_schema.py +""" +import os +import sqlite3 +import sys + + +def main(db_path: str) -> int: + if not os.path.exists(db_path): + print(f"FEHLER: DB nicht gefunden: {db_path}", file=sys.stderr) + return 1 + + conn = sqlite3.connect(db_path, timeout=60) + conn.execute("PRAGMA busy_timeout = 60000") + conn.execute("PRAGMA journal_mode = WAL") + + print(f"Migration auf {db_path}") + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS source_health_checks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + check_type TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + details TEXT, + checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_source_health_checks_source ON source_health_checks(source_id); + CREATE INDEX IF NOT EXISTS idx_source_health_checks_status ON source_health_checks(status); + + CREATE TABLE IF NOT EXISTS source_suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + suggestion_type TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, + suggested_data TEXT, + priority TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + reviewed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_source_suggestions_status ON source_suggestions(status); + CREATE INDEX IF NOT EXISTS idx_source_suggestions_source ON source_suggestions(source_id); + """) + print(" + source_health_checks + Indizes (idempotent)") + print(" + source_suggestions + Indizes (idempotent)") + + conn.commit() + conn.close() + print("Migration abgeschlossen.") + return 0 + + +if __name__ == "__main__": + db_path = os.environ.get("DB_PATH", "/home/claude-dev/osint-data/osint.db") + sys.exit(main(db_path)) diff --git a/requirements.txt b/requirements.txt index 5dd6745..9903104 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ passlib[bcrypt] aiosqlite python-multipart aiosmtplib +httpx>=0.28 +feedparser>=6.0 diff --git a/src/config.py b/src/config.py index ffcd599..ffa2942 100644 --- a/src/config.py +++ b/src/config.py @@ -46,3 +46,5 @@ CLAUDE_PATH = os.environ.get("CLAUDE_PATH", "/home/claude-dev/.claude/local/clau CLAUDE_TIMEOUT = 300 MAX_FEEDS_PER_DOMAIN = 3 CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001" +CLAUDE_MODEL_MEDIUM = "claude-sonnet-4-6" +CLAUDE_MODEL_STANDARD = "claude-opus-4-7" diff --git a/src/routers/sources.py b/src/routers/sources.py index 5f13666..e483283 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,23 +1,16 @@ -import os -"""Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" -import sys +"""Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" import logging -# Monitor-Source-Rules verfügbar machen -sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import Optional +import aiosqlite + from auth import get_current_admin from database import db_dependency from audit import log_action, get_client_ip, row_to_dict -import aiosqlite - -sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src')) - -from source_rules import ( +from shared.source_rules import ( discover_source, discover_all_feeds, evaluate_feeds_with_claude, @@ -30,6 +23,8 @@ logger = logging.getLogger("verwaltung.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) +SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"} + class GlobalSourceCreate(BaseModel): name: str = Field(min_length=1, max_length=200) @@ -334,7 +329,7 @@ async def add_discovered_sources( existing_urls.add(feed["url"]) added += 1 - # Web-Source für die Domain anlegen wenn noch nicht vorhanden + # Web-Source für die Domain anlegen wenn noch nicht vorhanden if feeds and feeds[0].get("domain"): domain = feeds[0]["domain"] cursor = await db.execute( @@ -362,7 +357,7 @@ async def add_discovered_sources( -# --- Health-Check & Vorschläge --- +# --- Health-Check & Vorschläge --- @router.get("/health") async def get_health( @@ -370,7 +365,7 @@ async def get_health( db: aiosqlite.Connection = Depends(db_dependency), ): """Health-Check-Ergebnisse abrufen.""" - # Prüfen ob Tabelle existiert + # Prüfen ob Tabelle existiert cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'" ) @@ -412,7 +407,7 @@ async def get_suggestions( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): - """Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete).""" + """Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete).""" cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'" ) @@ -476,7 +471,7 @@ async def update_suggestion( "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,) ) if await cursor.fetchone(): - result_action = "übersprungen (URL bereits vorhanden)" + result_action = "übersprungen (URL bereits vorhanden)" new_status = "rejected" else: await db.execute( @@ -486,7 +481,7 @@ async def update_suggestion( ) result_action = f"Quelle '{name}' angelegt" else: - result_action = "übersprungen (keine URL)" + result_action = "übersprungen (keine URL)" new_status = "rejected" elif stype == "deactivate_source": @@ -499,7 +494,7 @@ async def update_suggestion( source_id = suggestion["source_id"] if source_id: await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) - result_action = "Quelle gelöscht" + result_action = "Quelle gelöscht" elif stype == "fix_url": source_id = suggestion["source_id"] @@ -509,7 +504,7 @@ async def update_suggestion( result_action = f"URL aktualisiert" # Auto-Reject: Wenn fix_url oder add_source akzeptiert wird, - # zugehörige deactivate_source-Vorschläge automatisch ablehnen + # zugehörige deactivate_source-Vorschläge automatisch ablehnen if stype in ("fix_url", "add_source") and suggestion.get("source_id"): await db.execute( "UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP " @@ -539,36 +534,9 @@ async def run_health_check_now( db: aiosqlite.Connection = Depends(db_dependency), ): """Health-Check manuell starten.""" - # Tabellen sicherstellen - await db.executescript(""" - CREATE TABLE IF NOT EXISTS source_health_checks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, - check_type TEXT NOT NULL, - status TEXT NOT NULL, - message TEXT, - details TEXT, - checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, - priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', - reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) - await db.commit() - # source_health und source_suggester importieren - sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from services.source_health import run_health_checks - from services.source_suggester import generate_suggestions + from shared.services.source_health import run_health_checks + from shared.services.source_suggester import generate_suggestions result = await run_health_checks(db) suggestion_count = await generate_suggestions(db) @@ -593,26 +561,6 @@ async def run_health_check_stream( import asyncio from urllib.parse import urlparse - # Tabellen sicherstellen - await db.executescript(""" - CREATE TABLE IF NOT EXISTS source_health_checks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, - check_type TEXT NOT NULL, status TEXT NOT NULL, - message TEXT, details TEXT, - checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, title TEXT NOT NULL, - description TEXT, source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) - await db.commit() - # Quellen laden cursor = await db.execute( "SELECT id, name, url, domain, source_type, article_count, last_seen_at " @@ -733,8 +681,7 @@ async def run_health_check_stream( # Phase 2: Vorschlaege yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n" - sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from services.source_suggester import generate_suggestions + from shared.services.source_suggester import generate_suggestions suggestion_count = await generate_suggestions(db) # Fertig @@ -749,7 +696,7 @@ async def search_fix_for_source( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): - """Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen.""" + """Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen.""" import json as _json cursor = await db.execute( @@ -762,7 +709,7 @@ async def search_fix_for_source( source = dict(source) - # Health-Check-Probleme für diese Quelle laden + # Health-Check-Probleme für diese Quelle laden cursor = await db.execute( "SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?", (source_id,), @@ -781,14 +728,14 @@ Kategorie: {source['category']} Probleme: {issues_text} -Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle. -- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren -- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs) -- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar? +Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle. +- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren +- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs) +- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar? Regeln: -- Maximal 3 Lösungen vorschlagen (die besten) -- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss) +- Maximal 3 Lösungen vorschlagen (die besten) +- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss) Antworte NUR mit einem JSON-Objekt: {{ @@ -798,16 +745,15 @@ Antworte NUR mit einem JSON-Objekt: "type": "replace_url|add_feed|deactivate", "name": "Anzeigename", "url": "https://...", - "description": "Kurze Begründung" + "description": "Kurze Begründung" }} ], - "summary": "Zusammenfassung in 1-2 Sätzen" + "summary": "Zusammenfassung in 1-2 Sätzen" }} Nur das JSON, kein anderer Text.""" - sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from agents.claude_client import call_claude + from shared.agents.claude_client import call_claude try: response, usage = await call_claude(prompt, tools="WebSearch,WebFetch") @@ -819,21 +765,7 @@ Nur das JSON, kein anderer Text.""" else: result = {"fixable": False, "solutions": [], "summary": response[:500]} - # Lösungen als Vorschläge speichern - await db.executescript(""" - CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, - priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', - reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) + # Lösungen als Vorschläge speichern for sol in result.get("solutions", []): sol_type = sol.get("type", "add_feed") diff --git a/src/shared/__init__.py b/src/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/shared/agents/__init__.py b/src/shared/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/shared/agents/claude_client.py b/src/shared/agents/claude_client.py new file mode 100644 index 0000000..eb99629 --- /dev/null +++ b/src/shared/agents/claude_client.py @@ -0,0 +1,209 @@ +"""Shared Claude CLI Client mit Usage-Tracking.""" +import asyncio +import contextvars +import json +import logging +from dataclasses import dataclass +from config import CLAUDE_PATH, CLAUDE_TIMEOUT, CLAUDE_MODEL_FAST, CLAUDE_MODEL_STANDARD + +# ContextVar fuer Cancel-Event: Wird vom Orchestrator gesetzt, +# call_claude prueft automatisch darauf -- kein Durchreichen noetig. +_cancel_event_var: contextvars.ContextVar[asyncio.Event | None] = contextvars.ContextVar("_cancel_event_var", default=None) + +logger = logging.getLogger("osint.claude_client") + + +class ClaudeCliError(RuntimeError): + """Strukturierter Fehler aus dem Claude CLI mit Kategorie. + + error_type: + - "rate_limit": Anthropic Rate-Limit oder Overload (transient, retry-tauglich) + - "auth_error": Account-Problem (Organisation hat keinen Claude-Zugang, + Token abgelaufen/ungueltig) - kein Retry sinnvoll, Admin-Aktion noetig + - "timeout": Claude CLI Timeout (transient) + - "cli_error": Sonstiger CLI-Fehler (unspezifisch, Default) + """ + + def __init__(self, error_type: str, message: str): + self.error_type = error_type + self.message = message + super().__init__(f"Claude CLI [{error_type}]: {message}") + + +def _classify_cli_error(combined_output: str) -> str: + """Ordnet einer Fehler-Ausgabe eine error_type-Kategorie zu.""" + txt = combined_output.lower() + rate_limit_keywords = ["hit your limit", "rate limit", "resets", "rate_limit", "overloaded"] + auth_error_keywords = ["does not have access", "login again", "contact your administrator"] + if any(kw in txt for kw in rate_limit_keywords): + return "rate_limit" + if any(kw in txt for kw in auth_error_keywords): + return "auth_error" + return "cli_error" + + +@dataclass +class ClaudeUsage: + """Token-Verbrauch eines einzelnen Claude CLI Aufrufs.""" + input_tokens: int = 0 + output_tokens: int = 0 + cache_creation_tokens: int = 0 + cache_read_tokens: int = 0 + cost_usd: float = 0.0 + duration_ms: int = 0 + + +@dataclass +class UsageAccumulator: + """Akkumuliert Usage über mehrere Claude-Aufrufe eines Refreshs.""" + input_tokens: int = 0 + output_tokens: int = 0 + cache_creation_tokens: int = 0 + cache_read_tokens: int = 0 + total_cost_usd: float = 0.0 + call_count: int = 0 + + def add(self, usage: ClaudeUsage): + self.input_tokens += usage.input_tokens + self.output_tokens += usage.output_tokens + self.cache_creation_tokens += usage.cache_creation_tokens + self.cache_read_tokens += usage.cache_read_tokens + self.total_cost_usd += usage.cost_usd + self.call_count += 1 + + + +def _sanitize_mdash(text: str) -> str: + """Ersetzt Gedankenstriche durch Bindestriche (KI-Indikator reduzieren).""" + return text.replace("\u2014", " - ").replace("\u2013", " - ") + +async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch", model: str | None = None, raw_text: bool = False, timeout: float | None = None) -> tuple[str, ClaudeUsage]: + """Ruft Claude CLI auf. Gibt (result_text, usage) zurück. + + Prompt wird via stdin uebergeben um OS ARG_MAX Limits zu vermeiden. + + Args: + prompt: Der Prompt fuer Claude + tools: Kommagetrennte erlaubte Tools (None = keine Tools, --max-turns 1) + model: Optionales Modell (z.B. CLAUDE_MODEL_FAST fuer Haiku). None = CLAUDE_MODEL_STANDARD (Opus 4.7). + timeout: Override in Sekunden. None = Fallback auf globalen CLAUDE_TIMEOUT (1800s). + """ + effective_model = model or CLAUDE_MODEL_STANDARD + effective_timeout = timeout if timeout is not None else CLAUDE_TIMEOUT + cmd = [CLAUDE_PATH, "-p", "-", "--output-format", "json", "--model", effective_model] + if tools: + cmd.extend(["--allowedTools", tools]) + else: + cmd.extend(["--max-turns", "1", "--allowedTools", ""]) + if not raw_text: + cmd.extend(["--append-system-prompt", + "CRITICAL: You are a JSON-only output agent. " + "Output EXCLUSIVELY a single valid JSON object. " + "No explanatory text, no markdown fences, no continuation of previous responses. " + "Start your response with { and end with }."]) + + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE, + env={ + "PATH": "/usr/local/bin:/usr/bin:/bin", + "HOME": "/home/claude-dev", + "LANG": "C.UTF-8", + "LC_ALL": "C.UTF-8", + }, + ) + try: + cancel_event = _cancel_event_var.get(None) + if cancel_event: + # Cancel-aware: Monitor cancel_event while process runs + communicate_task = asyncio.create_task( + process.communicate(input=prompt.encode("utf-8")) + ) + cancel_wait_task = asyncio.create_task(cancel_event.wait()) + timeout_task = asyncio.create_task(asyncio.sleep(effective_timeout)) + + done, pending = await asyncio.wait( + [communicate_task, cancel_wait_task, timeout_task], + return_when=asyncio.FIRST_COMPLETED, + ) + + for p in pending: + p.cancel() + + if communicate_task in done: + stdout, stderr = communicate_task.result() + elif cancel_wait_task in done: + process.kill() + await process.wait() + raise asyncio.CancelledError("Cancel angefordert") + else: + process.kill() + await process.wait() + raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s") + else: + stdout, stderr = await asyncio.wait_for( + process.communicate(input=prompt.encode("utf-8")), timeout=effective_timeout + ) + except asyncio.TimeoutError: + process.kill() + raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s") + + if process.returncode != 0: + error_msg = stderr.decode("utf-8", errors="replace").strip() + stdout_msg = stdout.decode("utf-8", errors="replace").strip() + + # Rate-Limit/Auth-Fehler kommen teils als JSON auf stdout, nicht auf stderr + combined_output = f"{error_msg} {stdout_msg}" + error_type = _classify_cli_error(combined_output) + + if error_type == "rate_limit": + logger.warning(f"Claude CLI Rate-Limit (Exit {process.returncode}): {stdout_msg or error_msg}") + elif error_type == "auth_error": + logger.error(f"Claude CLI Auth-Fehler (Exit {process.returncode}): {stdout_msg or error_msg}") + else: + logger.error(f"Claude CLI Fehler (Exit {process.returncode}): {error_msg}") + if stdout_msg: + logger.error(f"Claude CLI stdout bei Fehler: {stdout_msg[:500]}") + + raise ClaudeCliError(error_type, stdout_msg or error_msg) + + raw = stdout.decode("utf-8", errors="replace").strip() + usage = ClaudeUsage() + result_text = raw + + try: + data = json.loads(raw) + # CLI kann returncode=0 liefern und trotzdem is_error=true setzen + # (z.B. "Your organization does not have access to Claude") + if data.get("is_error"): + error_text = str(data.get("result", "")) + error_type = _classify_cli_error(error_text) + if error_type == "rate_limit": + logger.warning(f"Claude CLI Rate-Limit (is_error): {error_text}") + elif error_type == "auth_error": + logger.error(f"Claude CLI Auth-Fehler (is_error): {error_text}") + else: + logger.error(f"Claude CLI Fehler (is_error): {error_text}") + raise ClaudeCliError(error_type, error_text) + + result_text = data.get("result", raw) + u = data.get("usage", {}) + usage = ClaudeUsage( + input_tokens=u.get("input_tokens", 0), + output_tokens=u.get("output_tokens", 0), + cache_creation_tokens=u.get("cache_creation_input_tokens", 0), + cache_read_tokens=u.get("cache_read_input_tokens", 0), + cost_usd=data.get("total_cost_usd", 0.0), + duration_ms=data.get("duration_ms", 0), + ) + model_info = f" [{model}]" if model else "" + logger.info( + f"Claude{model_info}: {usage.input_tokens} in / {usage.output_tokens} out / " + f"cache {usage.cache_creation_tokens}+{usage.cache_read_tokens} / " + f"${usage.cost_usd:.4f} / {usage.duration_ms}ms" + ) + except json.JSONDecodeError: + logger.warning("Claude CLI Antwort kein gültiges JSON, nutze raw output") + + result_text = _sanitize_mdash(result_text) + return result_text, usage diff --git a/src/shared/services/__init__.py b/src/shared/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/shared/services/source_health.py b/src/shared/services/source_health.py new file mode 100644 index 0000000..e6ee799 --- /dev/null +++ b/src/shared/services/source_health.py @@ -0,0 +1,282 @@ +"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" +import asyncio +import logging +import json +from urllib.parse import urlparse + +import httpx +import feedparser +import aiosqlite + +logger = logging.getLogger("osint.source_health") + + +async def run_health_checks(db: aiosqlite.Connection) -> dict: + """Führt alle Health-Checks für aktive Grundquellen durch.""" + logger.info("Starte Quellen-Health-Check...") + + # Alle aktiven Grundquellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, article_count, last_seen_at " + "FROM sources WHERE status = 'active' AND tenant_id IS NULL" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) + await db.execute("DELETE FROM source_health_checks") + await db.commit() + + checks_done = 0 + issues_found = 0 + + # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) + sources_with_url = [s for s in sources if s["url"]] + + async with httpx.AsyncClient( + timeout=15.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + ) as client: + for i in range(0, len(sources_with_url), 5): + batch = sources_with_url[i:i + 5] + tasks = [_check_source_reachability(client, s) for s in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for source, result in zip(batch, results): + if isinstance(result, Exception): + await _save_check( + db, source["id"], "reachability", "error", + f"Prüfung fehlgeschlagen: {result}", + ) + issues_found += 1 + else: + for check in result: + await _save_check( + db, source["id"], check["type"], check["status"], + check["message"], check.get("details"), + ) + if check["status"] != "ok": + issues_found += 1 + checks_done += 1 + + # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) + for source in sources: + if source["source_type"] in ("excluded", "web_source"): + continue + stale_check = _check_stale(source) + if stale_check: + await _save_check( + db, source["id"], stale_check["type"], + stale_check["status"], stale_check["message"], + ) + if stale_check["status"] != "ok": + issues_found += 1 + + # 3. Duplikate erkennen + duplicates = _find_duplicates(sources) + for dup in duplicates: + await _save_check( + db, dup["source_id"], "duplicate", "warning", + dup["message"], json.dumps(dup.get("details", {})), + ) + issues_found += 1 + + await db.commit() + logger.info( + f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " + f"{issues_found} Probleme gefunden" + ) + return {"checked": checks_done, "issues": issues_found} + + +async def _check_source_reachability( + client: httpx.AsyncClient, source: dict, +) -> list[dict]: + """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" + checks = [] + url = source["url"] + + try: + resp = await client.get(url) + + if resp.status_code >= 400: + checks.append({ + "type": "reachability", + "status": "error", + "message": f"HTTP {resp.status_code} - nicht erreichbar", + "details": json.dumps({"status_code": resp.status_code, "url": url}), + }) + return checks + + if resp.status_code >= 300: + checks.append({ + "type": "reachability", + "status": "warning", + "message": f"HTTP {resp.status_code} - Weiterleitung", + "details": json.dumps({ + "status_code": resp.status_code, + "final_url": str(resp.url), + }), + }) + else: + checks.append({ + "type": "reachability", + "status": "ok", + "message": "Erreichbar", + }) + + # Feed-Validität nur für RSS-Feeds + if source["source_type"] == "rss_feed": + text = resp.text[:20000] + if " dict | None: + """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" + if source["source_type"] == "excluded": + return None + + article_count = source.get("article_count") or 0 + last_seen = source.get("last_seen_at") + + if article_count == 0: + return { + "type": "stale", + "status": "warning", + "message": "Noch nie Artikel geliefert", + } + + if last_seen: + try: + from datetime import datetime + last_dt = datetime.fromisoformat(last_seen) + now = datetime.now() + age_days = (now - last_dt).days + if age_days > 30: + return { + "type": "stale", + "status": "warning", + "message": f"Letzter Artikel vor {age_days} Tagen", + } + except (ValueError, TypeError): + pass + + return None + + +def _find_duplicates(sources: list[dict]) -> list[dict]: + """Findet doppelte Quellen (gleiche URL).""" + duplicates = [] + url_map = {} + + for s in sources: + if not s["url"]: + continue + url_norm = s["url"].lower().rstrip("/") + if url_norm in url_map: + existing = url_map[url_norm] + duplicates.append({ + "source_id": s["id"], + "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", + "details": {"duplicate_of": existing["id"], "type": "url"}, + }) + else: + url_map[url_norm] = s + + return duplicates + + +async def _save_check( + db: aiosqlite.Connection, source_id: int, check_type: str, + status: str, message: str, details: str = None, +): + """Speichert ein Health-Check-Ergebnis.""" + await db.execute( + "INSERT INTO source_health_checks " + "(source_id, check_type, status, message, details) " + "VALUES (?, ?, ?, ?, ?)", + (source_id, check_type, status, message, details), + ) + + +async def get_health_summary(db: aiosqlite.Connection) -> dict: + """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" + cursor = await db.execute(""" + SELECT + h.id, h.source_id, s.name, s.domain, s.url, s.source_type, + h.check_type, h.status, h.message, h.details, h.checked_at + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + ORDER BY + CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, + s.name + """) + checks = [dict(row) for row in await cursor.fetchall()] + + error_count = sum(1 for c in checks if c["status"] == "error") + warning_count = sum(1 for c in checks if c["status"] == "warning") + ok_count = sum(1 for c in checks if c["status"] == "ok") + + cursor = await db.execute( + "SELECT MAX(checked_at) as last_check FROM source_health_checks" + ) + row = await cursor.fetchone() + last_check = row["last_check"] if row else None + + return { + "last_check": last_check, + "total_checks": len(checks), + "errors": error_count, + "warnings": warning_count, + "ok": ok_count, + "checks": checks, + } diff --git a/src/shared/services/source_suggester.py b/src/shared/services/source_suggester.py new file mode 100644 index 0000000..dc51ad7 --- /dev/null +++ b/src/shared/services/source_suggester.py @@ -0,0 +1,277 @@ +"""KI-gestützte Quellen-Vorschläge via Haiku.""" +import json +import logging +import re + +import aiosqlite + +from shared.agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_suggester") + + +async def generate_suggestions(db: aiosqlite.Connection) -> int: + """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" + logger.info("Starte Quellen-Vorschläge via Haiku...") + + # 1. Aktuelle Quellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, status, " + "article_count, last_seen_at " + "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # 2. Health-Check-Probleme laden + cursor = await db.execute(""" + SELECT h.source_id, s.name, s.domain, s.url, + h.check_type, h.status, h.message + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + WHERE h.status IN ('error', 'warning') + """) + issues = [dict(row) for row in await cursor.fetchall()] + + # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) + await db.execute( + "DELETE FROM source_suggestions " + "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" + ) + + # 4. Quellen-Zusammenfassung für Haiku + categories = {} + for s in sources: + cat = s["category"] + if cat not in categories: + categories[cat] = [] + categories[cat].append(s) + + source_summary = "" + for cat, cat_sources in sorted(categories.items()): + active = [ + s for s in cat_sources + if s["status"] == "active" and s["source_type"] != "excluded" + ] + source_summary += f"\n{cat} ({len(active)} aktiv): " + source_summary += ", ".join(s["name"] for s in active[:10]) + if len(active) > 10: + source_summary += f" ... (+{len(active) - 10} weitere)" + + issues_summary = "" + if issues: + issues_summary = "\n\nProbleme gefunden:\n" + for issue in issues[:20]: + issues_summary += ( + f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " + f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" + ) + + prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. + +Aktuelle Quellensammlung:{source_summary}{issues_summary} + +Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. + +Beachte: +1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste +2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor +3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen +4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind +5. Maximal 5 Vorschläge + +Antworte NUR mit einem JSON-Array. Jedes Element: +{{ + "type": "add_source|deactivate_source|fix_url|remove_source", + "title": "Kurzer Titel", + "description": "Begründung", + "priority": "low|medium|high", + "source_id": null, + "data": {{ + "name": "Anzeigename", + "url": "https://...", + "domain": "example.de", + "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" + }} +}} + +Nur das JSON-Array, kein anderer Text.""" + + try: + response, usage = await call_claude( + prompt, tools=None, model=CLAUDE_MODEL_FAST, + ) + + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if not json_match: + logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") + return 0 + + suggestions = json.loads(json_match.group(0)) + + count = 0 + for suggestion in suggestions[:5]: + stype = suggestion.get("type", "add_source") + title = suggestion.get("title", "") + desc = suggestion.get("description", "") + priority = suggestion.get("priority", "medium") + source_id = suggestion.get("source_id") + data = json.dumps( + suggestion.get("data", {}), ensure_ascii=False, + ) + + # source_id validieren (muss existieren oder None sein) + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM sources WHERE id = ?", (source_id,), + ) + if not await cursor.fetchone(): + source_id = None + + # Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending? + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'", + (stype, source_id), + ) + else: + # Bei add_source ohne source_id: Domain aus suggested_data prüfen + check_domain = suggestion.get('data', {}).get('domain', '') + if check_domain: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'", + (stype, f'%{check_domain}%'), + ) + else: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE title = ? AND status = 'pending'", + (title,), + ) + if await cursor.fetchone(): + continue + + await db.execute( + "INSERT INTO source_suggestions " + "(suggestion_type, title, description, source_id, " + "suggested_data, priority, status) " + "VALUES (?, ?, ?, ?, ?, ?, 'pending')", + (stype, title, desc, source_id, data, priority), + ) + count += 1 + + await db.commit() + logger.info( + f"Quellen-Vorschläge: {count} neue Vorschläge generiert " + f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " + f"${usage.cost_usd:.4f})" + ) + return count + + except Exception as e: + logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) + return 0 + + +async def apply_suggestion( + db: aiosqlite.Connection, suggestion_id: int, accept: bool, +) -> dict: + """Wendet einen Vorschlag an oder lehnt ihn ab.""" + cursor = await db.execute( + "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), + ) + suggestion = await cursor.fetchone() + if not suggestion: + raise ValueError("Vorschlag nicht gefunden") + + suggestion = dict(suggestion) + + if suggestion["status"] != "pending": + raise ValueError(f"Vorschlag bereits {suggestion['status']}") + + new_status = "accepted" if accept else "rejected" + result = {"status": new_status, "action": None} + + if accept: + stype = suggestion["suggestion_type"] + data = ( + json.loads(suggestion["suggested_data"]) + if suggestion["suggested_data"] + else {} + ) + + if stype == "add_source": + name = data.get("name", "Unbenannt") + url = data.get("url") + domain = data.get("domain", "") + category = data.get("category", "sonstige") + source_type = "rss_feed" if url and any( + x in (url or "").lower() + for x in ("rss", "feed", "xml", "atom") + ) else "web_source" + + if url: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", + (url,), + ) + if await cursor.fetchone(): + result["action"] = "übersprungen (URL bereits vorhanden)" + new_status = "rejected" + else: + await db.execute( + "INSERT INTO sources " + "(name, url, domain, source_type, category, status, " + "added_by, tenant_id) " + "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", + (name, url, domain, source_type, category), + ) + result["action"] = f"Quelle '{name}' angelegt" + else: + result["action"] = "übersprungen (keine URL)" + new_status = "rejected" + + elif stype == "deactivate_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "UPDATE sources SET status = 'inactive' WHERE id = ?", + (source_id,), + ) + result["action"] = "Quelle deaktiviert" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "remove_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "DELETE FROM sources WHERE id = ?", (source_id,), + ) + result["action"] = "Quelle gelöscht" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "fix_url": + source_id = suggestion["source_id"] + new_url = data.get("url") + if source_id and new_url: + await db.execute( + "UPDATE sources SET url = ? WHERE id = ?", + (new_url, source_id), + ) + result["action"] = f"URL aktualisiert auf {new_url}" + else: + result["action"] = "übersprungen (keine source_id oder URL)" + + await db.execute( + "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (new_status, suggestion_id), + ) + await db.commit() + + result["status"] = new_status + return result diff --git a/src/shared/source_rules.py b/src/shared/source_rules.py new file mode 100644 index 0000000..24826b0 --- /dev/null +++ b/src/shared/source_rules.py @@ -0,0 +1,742 @@ +"""Dynamische Quellen-Regeln aus der Datenbank.""" +import logging +import re +import json +import asyncio +from urllib.parse import urlparse + +import httpx +import feedparser + +import hashlib +from config import CLAUDE_PATH, CLAUDE_TIMEOUT, MAX_FEEDS_PER_DOMAIN + +logger = logging.getLogger("osint.source_rules") + +# Domain -> Kategorie Mapping für Auto-Erkennung +DOMAIN_CATEGORY_MAP = { + # Nachrichtenagenturen + "reuters.com": "nachrichtenagentur", + "apnews.com": "nachrichtenagentur", + "dpa.com": "nachrichtenagentur", + "afp.com": "nachrichtenagentur", + # Öffentlich-Rechtlich + "tagesschau.de": "oeffentlich-rechtlich", + "zdf.de": "oeffentlich-rechtlich", + "dw.com": "oeffentlich-rechtlich", + "br.de": "oeffentlich-rechtlich", + "ndr.de": "oeffentlich-rechtlich", + "wdr.de": "oeffentlich-rechtlich", + "mdr.de": "oeffentlich-rechtlich", + "swr.de": "oeffentlich-rechtlich", + "hr.de": "oeffentlich-rechtlich", + "rbb24.de": "oeffentlich-rechtlich", + "ard.de": "oeffentlich-rechtlich", + "orf.at": "oeffentlich-rechtlich", + "srf.ch": "oeffentlich-rechtlich", + # Qualitätszeitungen + "spiegel.de": "qualitaetszeitung", + "zeit.de": "qualitaetszeitung", + "faz.net": "qualitaetszeitung", + "sueddeutsche.de": "qualitaetszeitung", + "nzz.ch": "qualitaetszeitung", + "welt.de": "qualitaetszeitung", + "tagesspiegel.de": "qualitaetszeitung", + "fr.de": "qualitaetszeitung", + "stern.de": "qualitaetszeitung", + "focus.de": "qualitaetszeitung", + # Behörden + "bmi.bund.de": "behoerde", + "europol.europa.eu": "behoerde", + "bka.de": "behoerde", + "bsi.bund.de": "behoerde", + "verfassungsschutz.de": "behoerde", + "bpb.de": "behoerde", + # Fachmedien + "netzpolitik.org": "fachmedien", + "handelsblatt.com": "fachmedien", + "heise.de": "fachmedien", + "golem.de": "fachmedien", + "t3n.de": "fachmedien", + "wiwo.de": "fachmedien", + # Think Tanks + "swp-berlin.org": "think-tank", + "iiss.org": "think-tank", + "brookings.edu": "think-tank", + "rand.org": "think-tank", + "dgap.org": "think-tank", + "chathamhouse.org": "think-tank", + # International + "bbc.co.uk": "international", + "bbc.com": "international", + "aljazeera.com": "international", + "france24.com": "international", + "cnn.com": "international", + "theguardian.com": "international", + "nytimes.com": "international", + "washingtonpost.com": "international", + "lemonde.fr": "international", + "elpais.com": "international", + # Regional + "berliner-zeitung.de": "regional", + "hamburger-abendblatt.de": "regional", + "stuttgarter-zeitung.de": "regional", + "ksta.de": "regional", + "rp-online.de": "regional", + "merkur.de": "regional", + # Telegram + "t.me": "telegram", +} + +# Bekannte Feed-Pfade zum Durchprobieren +_FEED_PATHS = ["/feed", "/rss", "/rss.xml", "/atom.xml", "/feed.xml", "/index.xml", "/feed/rss", "/feeds/posts/default"] + +# Erweiterte nachrichtenspezifische Feed-Pfade für Multi-Discovery +_NEWS_FEED_PATHS = [ + "/world/rss", "/world/rss.xml", "/world/feed", + "/politics/rss", "/politics/rss.xml", "/politics/feed", + "/business/rss", "/business/rss.xml", "/business/feed", + "/technology/rss", "/technology/rss.xml", "/technology/feed", + "/environment/rss", "/environment/rss.xml", "/environment/feed", + "/science/rss", "/science/rss.xml", "/science/feed", + "/europe/rss", "/europe/rss.xml", "/europe/feed", + "/security/rss", "/security/rss.xml", "/security/feed", + "/international/rss", "/international/rss.xml", "/international/feed", + "/economy/rss", "/economy/rss.xml", "/economy/feed", + "/defence/rss", "/defence/rss.xml", "/defence/feed", + "/middle-east/rss", "/middle-east/rss.xml", + "/asia/rss", "/asia/rss.xml", + "/africa/rss", "/africa/rss.xml", + "/americas/rss", "/americas/rss.xml", + "/uk-news/rss", "/us-news/rss", + "/commentisfree/rss", "/opinion/rss", + "/law/rss", "/media/rss", + "/global-development/rss", + "/news/feed", "/news/rss", "/news/rss.xml", + "/politik/rss", "/politik/rss.xml", + "/wirtschaft/rss", "/wirtschaft/rss.xml", + "/panorama/rss", "/panorama/rss.xml", + "/wissen/rss", "/wissen/rss.xml", + "/ausland/rss", "/ausland/rss.xml", + "/inland/rss", "/inland/rss.xml", + "/netzwelt/rss", "/netzwelt/rss.xml", + "/kultur/rss", "/kultur/rss.xml", +] + +# Bekannte Feed-Subdomains für Portale die Feeds auf separater Domain hosten +_DOMAIN_FEED_URLS = { + "bbc.com": [ + "https://feeds.bbci.co.uk/news/rss.xml", + "https://feeds.bbci.co.uk/news/world/rss.xml", + "https://feeds.bbci.co.uk/news/business/rss.xml", + "https://feeds.bbci.co.uk/news/politics/rss.xml", + "https://feeds.bbci.co.uk/news/technology/rss.xml", + "https://feeds.bbci.co.uk/news/science_and_environment/rss.xml", + "https://feeds.bbci.co.uk/news/health/rss.xml", + "https://feeds.bbci.co.uk/news/education/rss.xml", + "https://feeds.bbci.co.uk/news/world/middle_east/rss.xml", + "https://feeds.bbci.co.uk/news/world/europe/rss.xml", + "https://feeds.bbci.co.uk/news/world/africa/rss.xml", + "https://feeds.bbci.co.uk/news/world/asia/rss.xml", + "https://feeds.bbci.co.uk/news/world/us_and_canada/rss.xml", + "https://feeds.bbci.co.uk/news/world/latin_america/rss.xml", + "https://feeds.bbci.co.uk/news/entertainment_and_arts/rss.xml", + ], + "bbc.co.uk": "bbc.com", # Alias + "reuters.com": [ + "https://www.reutersagency.com/feed/", + ], + "aljazeera.com": [ + "https://www.aljazeera.com/xml/rss/all.xml", + ], +} + + +def _get_extra_feed_urls(domain: str) -> list[str]: + """Gibt bekannte Feed-URLs für Domains mit separater Feed-Subdomain zurück.""" + entry = _DOMAIN_FEED_URLS.get(domain) + if isinstance(entry, str): + # Alias — auf andere Domain verweisen + entry = _DOMAIN_FEED_URLS.get(entry) + if isinstance(entry, list): + return entry + return [] + + +def _normalize_url(url: str) -> str: + """URL normalisieren (https:// ergänzen falls fehlend).""" + url = url.strip() + if not url.startswith(("http://", "https://")): + url = "https://" + url + return url + + +# Subdomain → kanonische Domain Zuordnung +_DOMAIN_ALIASES = { + "feeds.bbci.co.uk": "bbc.com", + "rss.sueddeutsche.de": "sueddeutsche.de", + "on.orf.at": "orf.at", + "rss.orf.at": "orf.at", + "rss.dw.com": "dw.com", + "newsfeed.zeit.de": "zeit.de", + "reutersagency.com": "reuters.com", + "edition.cnn.com": "cnn.com", + "rsshub.app": "apnews.com", +} + + +def _extract_domain(url: str) -> str: + """Domain aus URL extrahieren (ohne www., mit Alias-Normalisierung).""" + parsed = urlparse(url) + domain = parsed.hostname or "" + if domain.startswith("www."): + domain = domain[4:] + return _DOMAIN_ALIASES.get(domain, domain) + + +def _detect_category(domain: str) -> str: + """Kategorie anhand der Domain erkennen.""" + if domain in DOMAIN_CATEGORY_MAP: + return DOMAIN_CATEGORY_MAP[domain] + # Subdomain-Match: z.B. feeds.reuters.com -> reuters.com + parts = domain.split(".") + if len(parts) > 2: + parent = ".".join(parts[-2:]) + if parent in DOMAIN_CATEGORY_MAP: + return DOMAIN_CATEGORY_MAP[parent] + return "sonstige" + + +# Bekannte Domain → Anzeigename Zuordnungen +DOMAIN_DISPLAY_NAMES = { + "tagesschau.de": "tagesschau", + "zdf.de": "ZDF heute", + "spiegel.de": "Spiegel", + "zeit.de": "Zeit", + "newsfeed.zeit.de": "Zeit", + "faz.net": "FAZ", + "sueddeutsche.de": "Süddeutsche Zeitung", + "rss.sueddeutsche.de": "Süddeutsche Zeitung", + "nzz.ch": "NZZ", + "dw.com": "Deutsche Welle", + "rss.dw.com": "Deutsche Welle", + "reuters.com": "Reuters", + "reutersagency.com": "Reuters", + "rsshub.app": "RSSHub", + "apnews.com": "AP News", + "bbc.com": "BBC", + "bbc.co.uk": "BBC", + "feeds.bbci.co.uk": "BBC", + "aljazeera.com": "Al Jazeera", + "france24.com": "France24", + "theguardian.com": "The Guardian", + "nytimes.com": "New York Times", + "washingtonpost.com": "Washington Post", + "cnn.com": "CNN", + "bmi.bund.de": "BMI", + "europol.europa.eu": "Europol", + "handelsblatt.com": "Handelsblatt", + "wiwo.de": "WirtschaftsWoche", + "heise.de": "Heise Online", + "golem.de": "Golem", + "netzpolitik.org": "netzpolitik.org", + "t3n.de": "t3n", + "welt.de": "Welt", + "tagesspiegel.de": "Tagesspiegel", + "stern.de": "Stern", + "focus.de": "Focus", + "n-tv.de": "n-tv", + "bild.de": "BILD", + "tarnkappe.info": "Tarnkappe", + "bleepingcomputer.com": "BleepingComputer", + "techcrunch.com": "TechCrunch", + "theverge.com": "The Verge", + "wired.com": "WIRED", + "tomshardware.com": "Tom's Hardware", + "finanzen.net": "Finanzen.net", + "404media.co": "404 Media", + "medium.com": "Medium", + "swp-berlin.org": "SWP Berlin", + "dgap.org": "DGAP", + "brookings.edu": "Brookings", + "rand.org": "RAND", + "lemonde.fr": "Le Monde", + "elpais.com": "El País", + "orf.at": "ORF", + "srf.ch": "SRF", + "br.de": "BR", + "ndr.de": "NDR", + "wdr.de": "WDR", + "mdr.de": "MDR", + "swr.de": "SWR", + "hr.de": "hr", + "rbb24.de": "rbb24", + "fr.de": "Frankfurter Rundschau", + "rp-online.de": "Rheinische Post", + "ksta.de": "Kölner Stadt-Anzeiger", + "berliner-zeitung.de": "Berliner Zeitung", + "stuttgarter-zeitung.de": "Stuttgarter Zeitung", + "hamburger-abendblatt.de": "Hamburger Abendblatt", + "merkur.de": "Münchner Merkur", + "bsi.bund.de": "BSI", + "bpb.de": "bpb", + "bka.de": "BKA", + "verfassungsschutz.de": "Verfassungsschutz", + "bashinho.de": "Bashinho", +} + + +def domain_to_display_name(domain: str) -> str: + """Wandelt eine Domain in einen lesbaren Anzeigenamen um. + + Prüft erst die bekannte Zuordnung, dann leitet einen sinnvollen + Namen aus der Domain ab (erster Teil, kapitalisiert). + """ + if domain in DOMAIN_DISPLAY_NAMES: + return DOMAIN_DISPLAY_NAMES[domain] + # Subdomain-Match: feeds.reuters.com -> reuters.com + parts = domain.split(".") + if len(parts) > 2: + parent = ".".join(parts[-2:]) + if parent in DOMAIN_DISPLAY_NAMES: + return DOMAIN_DISPLAY_NAMES[parent] + # Fallback: Domain-Kern extrahieren und kapitalisieren + # z.B. "example-news.de" → "Example News" + core = parts[-2] if len(parts) >= 2 else parts[0] + return core.replace("-", " ").title() + + +def _compute_content_hash(entries: list) -> str: + """Berechnet einen Fingerprint aus den ersten 5 Entry-Titeln eines Feeds.""" + titles = [e.get("title", "") for e in entries[:5]] + combined = "|".join(titles).strip() + if not combined: + return "" + return hashlib.sha256(combined.encode("utf-8")).hexdigest()[:16] + + +async def _validate_feed(client: httpx.AsyncClient, url: str) -> dict | None: + """Prüft ob eine URL ein gültiger RSS/Atom-Feed ist. Gibt Feed-Info zurück oder None.""" + try: + resp = await client.get(url) + if resp.status_code != 200: + return None + content_type = resp.headers.get("content-type", "") + text = resp.text[:10000] # Nur Anfang prüfen + # Muss XML-artig sein + if " dict: + """Erkennt RSS-Feed, Name, Domain und Kategorie einer URL automatisch. + + Returns: + dict mit: name, domain, rss_url, category, source_type + """ + url = _normalize_url(url) + domain = _extract_domain(url) + category = _detect_category(domain) + + result = { + "name": domain_to_display_name(domain), + "domain": domain, + "rss_url": None, + "category": category, + "source_type": "web_source", + } + + async with httpx.AsyncClient( + timeout=12.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + ) as client: + # 1. Seite abrufen und nach RSS-Links suchen + page_title = None + try: + resp = await client.get(url) + if resp.status_code == 200: + html = resp.text[:50000] + # extrahieren + title_match = re.search(r"<title[^>]*>([^<]+)", html, re.IGNORECASE) + if title_match: + page_title = title_match.group(1).strip() + + # RSS/Atom Link-Tags suchen + feed_links = re.findall( + r']+type=["\']application/(rss|atom)\+xml["\'][^>]*>', + html, + re.IGNORECASE, + ) + # Auch umgekehrte Attribut-Reihenfolge + feed_links += re.findall( + r']+href=["\']([^"\']+)["\'][^>]+type=["\']application/(rss|atom)\+xml["\'][^>]*>', + html, + re.IGNORECASE, + ) + # href aus den gefundenen Tags extrahieren + feed_urls = [] + for tag in re.finditer( + r']+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>', + html, + re.IGNORECASE, + ): + href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0)) + if href_match: + href = href_match.group(1) + # Relative URLs auflösen + if href.startswith("/"): + parsed = urlparse(url) + href = f"{parsed.scheme}://{parsed.netloc}{href}" + elif not href.startswith("http"): + href = url.rstrip("/") + "/" + href + feed_urls.append(href) + + # Gefundene Feed-URLs validieren + for feed_url in feed_urls: + feed_info = await _validate_feed(client, feed_url) + if feed_info: + result["rss_url"] = feed_info["url"] + result["source_type"] = "rss_feed" + if feed_info["title"]: + result["name"] = feed_info["title"] + elif page_title: + result["name"] = page_title + return result + except Exception as e: + logger.debug(f"Fehler beim Abrufen von {url}: {e}") + + # 2. Bekannte Feed-Pfade durchprobieren + parsed = urlparse(url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + for path in _FEED_PATHS: + feed_url = base_url + path + feed_info = await _validate_feed(client, feed_url) + if feed_info: + result["rss_url"] = feed_info["url"] + result["source_type"] = "rss_feed" + if feed_info["title"]: + result["name"] = feed_info["title"] + elif page_title: + result["name"] = page_title + return result + + # Kein Feed gefunden — Name aus Seitentitel + if page_title: + result["name"] = page_title + + return result + + +async def discover_all_feeds(url: str) -> dict: + """Findet ALLE RSS/Atom-Feeds einer Domain. + + Returns: + dict mit: domain, category, page_title, feeds: [{"url", "title"}, ...] + """ + url = _normalize_url(url) + domain = _extract_domain(url) + category = _detect_category(domain) + + result = { + "domain": domain, + "category": category, + "page_title": None, + "feeds": [], + } + + seen_urls = set() + seen_content_hashes = set() + + async with httpx.AsyncClient( + timeout=15.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + ) as client: + # 1. HTML-Seite abrufen und ALLE RSS-Link-Tags sammeln + candidate_urls = [] + try: + resp = await client.get(url) + if resp.status_code == 200: + html = resp.text[:100000] + title_match = re.search(r"]*>([^<]+)", html, re.IGNORECASE) + if title_match: + result["page_title"] = title_match.group(1).strip() + + parsed = urlparse(url) + base = f"{parsed.scheme}://{parsed.netloc}" + + for tag in re.finditer( + r']+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>', + html, + re.IGNORECASE, + ): + href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0)) + if href_match: + href = href_match.group(1) + if href.startswith("/"): + href = base + href + elif not href.startswith("http"): + href = url.rstrip("/") + "/" + href + candidate_urls.append(href) + except Exception as e: + logger.debug(f"Fehler beim Abrufen von {url}: {e}") + + # 2. Bekannte Feed-Pfade hinzufügen (Standard + Nachrichten-spezifisch) + parsed = urlparse(url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + for path in _FEED_PATHS + _NEWS_FEED_PATHS: + candidate_urls.append(base_url + path) + + # 2b. Bekannte Feed-URLs für Domains mit separater Feed-Subdomain (z.B. BBC) + extra_urls = _get_extra_feed_urls(domain) + candidate_urls.extend(extra_urls) + + # 3. Alle Kandidaten parallel validieren (in Batches von 10) + async def _validate_and_collect(feed_url: str): + try: + return await _validate_feed(client, feed_url) + except Exception: + return None + + for i in range(0, len(candidate_urls), 10): + batch = candidate_urls[i:i + 10] + results = await asyncio.gather(*[_validate_and_collect(u) for u in batch]) + for feed_info in results: + if not feed_info: + continue + if feed_info["url"] in seen_urls: + continue + # Content-Hash Duplikat-Erkennung (gleicher Inhalt = WordPress-Redirect etc.) + content_hash = feed_info.get("content_hash", "") + if content_hash and content_hash in seen_content_hashes: + logger.debug(f"Content-Hash Duplikat übersprungen: {feed_info['url']}") + continue + seen_urls.add(feed_info["url"]) + if content_hash: + seen_content_hashes.add(content_hash) + result["feeds"].append(feed_info) + + logger.info(f"discover_all_feeds({domain}): {len(result['feeds'])} Feeds gefunden") + return result + + +async def evaluate_feeds_with_claude(domain: str, feeds: list[dict]) -> list[dict]: + """Lässt Claude die OSINT-Relevanz der Feeds bewerten. + + Args: + domain: Domain-Name + feeds: Liste von {"url", "title"} Dicts + + Returns: + Liste von {"url", "title", "name"} Dicts (nur relevante Feeds) + """ + if not feeds: + return [] + + feed_list = "\n".join( + f" {i+1}. {f['title'] or f['url']} — {f['url']}" + for i, f in enumerate(feeds) + ) + + prompt = f"""Du bist ein OSINT-Analyst. Bewerte diese RSS-Feeds der Domain "{domain}" nach OSINT-Relevanz. + +OSINT-relevante Themen: Politik, Sicherheit, Wirtschaft, Internationale Beziehungen, Verteidigung, Konflikte, Terrorismus, Cybersecurity, Umweltkatastrophen, Technologie, Wissenschaft, Nachrichten allgemein. + +NICHT relevant: Sport, Lifestyle, Rezepte, Unterhaltung, Reisen, Mode, Kultur/Kunst, Wetter, Kreuzworträtsel, Podcasts (allgemein), Leserbriefe, Kommentare/Meinung. + +Feeds: +{feed_list} + +Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element: +{{"index": <1-basiert>, "relevant": true/false, "name": ""}} + +Nur das JSON-Array, kein anderer Text.""" + + try: + cmd = [ + CLAUDE_PATH, + "-p", prompt, + "--output-format", "text", + ] + + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env={"PATH": "/usr/local/bin:/usr/bin:/bin", "HOME": "/home/claude-dev"}, + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), timeout=min(CLAUDE_TIMEOUT, 120) + ) + except asyncio.TimeoutError: + process.kill() + logger.warning(f"Claude-Bewertung Timeout für {domain}, nutze Fallback") + return _fallback_all_feeds(domain, feeds) + + if process.returncode != 0: + logger.warning(f"Claude-Bewertung fehlgeschlagen für {domain}, nutze Fallback") + return _fallback_all_feeds(domain, feeds) + + response = stdout.decode("utf-8", errors="replace").strip() + + # JSON aus Antwort extrahieren (Claude gibt manchmal Markdown-Blöcke zurück) + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if not json_match: + logger.warning(f"Kein JSON in Claude-Antwort für {domain}, nutze Fallback") + return _fallback_all_feeds(domain, feeds) + + evaluations = json.loads(json_match.group(0)) + + relevant = [] + for ev in evaluations: + idx = ev.get("index", 0) - 1 + if ev.get("relevant") and 0 <= idx < len(feeds): + feed = feeds[idx] + relevant.append({ + "url": feed["url"], + "title": feed["title"], + "name": ev.get("name", feed["title"] or domain), + }) + + logger.info(f"Claude-Bewertung für {domain}: {len(relevant)}/{len(feeds)} relevant") + return relevant + + except json.JSONDecodeError: + logger.warning(f"JSON-Parse-Fehler bei Claude-Antwort für {domain}, nutze Fallback") + return _fallback_all_feeds(domain, feeds) + except Exception as e: + logger.warning(f"Claude-Bewertung Fehler für {domain}: {e}, nutze Fallback") + return _fallback_all_feeds(domain, feeds) + + +def _fallback_all_feeds(domain: str, feeds: list[dict]) -> list[dict]: + """Fallback: Alle Feeds übernehmen mit Feed-Titel als Name.""" + return [ + { + "url": f["url"], + "title": f["title"], + "name": f["title"] or domain, + } + for f in feeds + ] + + +async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss_feed") -> list[dict]: + """Aktive Feeds eines bestimmten Typs mit Metadaten fuer Claude-Selektion (global + org-spezifisch). + + source_type: "rss_feed" (Default) oder "podcast_feed" — trennt RSS- und Podcast-Quellen + in getrennten Pipelines, damit der RSS-Heisspfad unveraendert bleibt. + """ + from database import get_db + + db = await get_db() + try: + if tenant_id: + cursor = await db.execute( + "SELECT name, url, domain, category, notes, COALESCE(article_count, 0) AS article_count FROM sources " + "WHERE source_type = ? AND status = 'active' " + "AND (tenant_id IS NULL OR tenant_id = ?)", + (source_type, tenant_id), + ) + else: + cursor = await db.execute( + "SELECT name, url, domain, category, notes, COALESCE(article_count, 0) AS article_count FROM sources " + "WHERE source_type = ? AND status = 'active'", + (source_type,), + ) + return [dict(row) for row in await cursor.fetchall()] + except Exception as e: + logger.error(f"Fehler beim Laden der Feed-Metadaten ({source_type}): {e}") + return [] + finally: + await db.close() + + +async def get_user_excluded_domains(user_id: int) -> list[str]: + """Laedt die vom User ausgeschlossenen Domains.""" + from database import get_db + + db = await get_db() + try: + cursor = await db.execute( + "SELECT domain FROM user_excluded_domains WHERE user_id = ?", + (user_id,), + ) + return [row[0] for row in await cursor.fetchall()] + except Exception as e: + logger.warning(f"Fehler beim Laden der User-Ausschluesse: {e}") + return [] + finally: + await db.close() + + +async def get_source_rules(tenant_id: int = None) -> dict: + """Liest Quellen-Konfiguration aus DB (global + org-spezifisch). + + Returns: + dict mit: + - excluded_domains: Liste ausgeschlossener Domains + - rss_feeds: Dict mit Kategorien deutsch/international/behoerden + """ + from database import get_db + + db = await get_db() + try: + if tenant_id: + cursor = await db.execute( + "SELECT * FROM sources WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)", + (tenant_id,), + ) + else: + cursor = await db.execute( + "SELECT * FROM sources WHERE status = 'active'" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + excluded_domains = [] + rss_feeds = {"deutsch": [], "international": [], "behoerden": []} + + for source in sources: + if source["source_type"] == "excluded": + excluded_domains.append(source["domain"] or source["name"]) + elif source["source_type"] == "rss_feed" and source["url"]: + feed_entry = {"name": source["name"], "url": source["url"]} + cat = source["category"] + if cat == "behoerde": + rss_feeds["behoerden"].append(feed_entry) + elif cat == "international": + rss_feeds["international"].append(feed_entry) + else: + # Alle anderen Kategorien → deutsch + rss_feeds["deutsch"].append(feed_entry) + + return { + "excluded_domains": excluded_domains, + "rss_feeds": rss_feeds, + } + except Exception as e: + logger.error(f"Fehler beim Laden der Quellen-Regeln: {e}") + # Fallback auf config.py + from config import RSS_FEEDS, EXCLUDED_SOURCES + return { + "excluded_domains": list(EXCLUDED_SOURCES), + "rss_feeds": dict(RSS_FEEDS), + } + finally: + await db.close()