Phase 1 Backend-Hygiene Quellen

- src/shared/ neu: source_rules, services/source_health, services/source_suggester,
  agents/claude_client als lokale Kopien aus dem Monitor-Repo (statt sys.path-Hack
  auf /home/claude-dev/AegisSight-Monitor/src - 5 sys.path.insert-Aufrufe entfernt)
- src/routers/sources.py: Imports auf shared. umgestellt, Header neu sortiert
  (Docstring zuerst, sys/os raus), Mojibake (Triple-Encoded UTF-8) via ftfy gefixt
- src/shared/services/source_suggester.py: Mojibake (Double-Encoded UTF-8) via ftfy gefixt
- migrations/2026-05-09c_source_health_schema.py NEU: source_health_checks +
  source_suggestions Tabellen mit Indizes (idempotent), gezogen aus 3 Inline-DDL-Blöcken
  in routers/sources.py (/health/run, /health/run-stream, /health/search-fix)
- src/config.py: CLAUDE_MODEL_MEDIUM und CLAUDE_MODEL_STANDARD ergänzt
  (vorher nur CLAUDE_MODEL_FAST - claude_client.py braucht alle drei)
- requirements.txt: httpx + feedparser explizit (im venv schon vorhanden, jetzt dokumentiert)
Dieser Commit ist enthalten in:
claude-dev
2026-05-09 02:47:13 +00:00
Ursprung 7c741062a9
Commit 650f8b0342
11 geänderte Dateien mit 1608 neuen und 97 gelöschten Zeilen

Datei anzeigen

@@ -46,3 +46,5 @@ CLAUDE_PATH = os.environ.get("CLAUDE_PATH", "/home/claude-dev/.claude/local/clau
CLAUDE_TIMEOUT = 300
MAX_FEEDS_PER_DOMAIN = 3
CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001"
CLAUDE_MODEL_MEDIUM = "claude-sonnet-4-6"
CLAUDE_MODEL_STANDARD = "claude-opus-4-7"

Datei anzeigen

@@ -1,23 +1,16 @@
import os
"""Grundquellen-Verwaltung und Kundenquellen-ÃÆÃ…“bersicht."""
import sys
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
import logging
# Monitor-Source-Rules verfÃÆÃ¼gbar machen
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import aiosqlite
from auth import get_current_admin
from database import db_dependency
from audit import log_action, get_client_ip, row_to_dict
import aiosqlite
sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src'))
from source_rules import (
from shared.source_rules import (
discover_source,
discover_all_feeds,
evaluate_feeds_with_claude,
@@ -30,6 +23,8 @@ logger = logging.getLogger("verwaltung.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"}
class GlobalSourceCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
@@ -334,7 +329,7 @@ async def add_discovered_sources(
existing_urls.add(feed["url"])
added += 1
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
if feeds and feeds[0].get("domain"):
domain = feeds[0]["domain"]
cursor = await db.execute(
@@ -362,7 +357,7 @@ async def add_discovered_sources(
# --- Health-Check & Vorschläge ---
# --- Health-Check & Vorschläge ---
@router.get("/health")
async def get_health(
@@ -370,7 +365,7 @@ async def get_health(
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check-Ergebnisse abrufen."""
# Prüfen ob Tabelle existiert
# Prüfen ob Tabelle existiert
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'"
)
@@ -412,7 +407,7 @@ async def get_suggestions(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'"
)
@@ -476,7 +471,7 @@ async def update_suggestion(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,)
)
if await cursor.fetchone():
result_action = "übersprungen (URL bereits vorhanden)"
result_action = "übersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
@@ -486,7 +481,7 @@ async def update_suggestion(
)
result_action = f"Quelle '{name}' angelegt"
else:
result_action = "übersprungen (keine URL)"
result_action = "übersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
@@ -499,7 +494,7 @@ async def update_suggestion(
source_id = suggestion["source_id"]
if source_id:
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
result_action = "Quelle gelöscht"
result_action = "Quelle gelöscht"
elif stype == "fix_url":
source_id = suggestion["source_id"]
@@ -509,7 +504,7 @@ async def update_suggestion(
result_action = f"URL aktualisiert"
# Auto-Reject: Wenn fix_url oder add_source akzeptiert wird,
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
if stype in ("fix_url", "add_source") and suggestion.get("source_id"):
await db.execute(
"UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP "
@@ -539,36 +534,9 @@ async def run_health_check_now(
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check manuell starten."""
# Tabellen sicherstellen
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
check_type TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
details TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await db.commit()
# source_health und source_suggester importieren
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from services.source_health import run_health_checks
from services.source_suggester import generate_suggestions
from shared.services.source_health import run_health_checks
from shared.services.source_suggester import generate_suggestions
result = await run_health_checks(db)
suggestion_count = await generate_suggestions(db)
@@ -593,26 +561,6 @@ async def run_health_check_stream(
import asyncio
from urllib.parse import urlparse
# Tabellen sicherstellen
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
check_type TEXT NOT NULL, status TEXT NOT NULL,
message TEXT, details TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL, title TEXT NOT NULL,
description TEXT, source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT, priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending', reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await db.commit()
# Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
@@ -733,8 +681,7 @@ async def run_health_check_stream(
# Phase 2: Vorschlaege
yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n"
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from services.source_suggester import generate_suggestions
from shared.services.source_suggester import generate_suggestions
suggestion_count = await generate_suggestions(db)
# Fertig
@@ -749,7 +696,7 @@ async def search_fix_for_source(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Sonnet mit WebSearch nach L̮̦sung fÃÆÃ¼r eine kaputte Quelle suchen lassen."""
"""Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen."""
import json as _json
cursor = await db.execute(
@@ -762,7 +709,7 @@ async def search_fix_for_source(
source = dict(source)
# Health-Check-Probleme für diese Quelle laden
# Health-Check-Probleme für diese Quelle laden
cursor = await db.execute(
"SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?",
(source_id,),
@@ -781,14 +728,14 @@ Kategorie: {source['category']}
Probleme:
{issues_text}
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
Regeln:
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
Antworte NUR mit einem JSON-Objekt:
{{
@@ -798,16 +745,15 @@ Antworte NUR mit einem JSON-Objekt:
"type": "replace_url|add_feed|deactivate",
"name": "Anzeigename",
"url": "https://...",
"description": "Kurze Begründung"
"description": "Kurze Begründung"
}}
],
"summary": "Zusammenfassung in 1-2 Sätzen"
"summary": "Zusammenfassung in 1-2 Sätzen"
}}
Nur das JSON, kein anderer Text."""
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from agents.claude_client import call_claude
from shared.agents.claude_client import call_claude
try:
response, usage = await call_claude(prompt, tools="WebSearch,WebFetch")
@@ -819,21 +765,7 @@ Nur das JSON, kein anderer Text."""
else:
result = {"fixable": False, "solutions": [], "summary": response[:500]}
# Lösungen als Vorschläge speichern
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# Lösungen als Vorschläge speichern
for sol in result.get("solutions", []):
sol_type = sol.get("type", "add_feed")

0
src/shared/__init__.py Normale Datei
Datei anzeigen

Datei anzeigen

Datei anzeigen

@@ -0,0 +1,209 @@
"""Shared Claude CLI Client mit Usage-Tracking."""
import asyncio
import contextvars
import json
import logging
from dataclasses import dataclass
from config import CLAUDE_PATH, CLAUDE_TIMEOUT, CLAUDE_MODEL_FAST, CLAUDE_MODEL_STANDARD
# ContextVar fuer Cancel-Event: Wird vom Orchestrator gesetzt,
# call_claude prueft automatisch darauf -- kein Durchreichen noetig.
_cancel_event_var: contextvars.ContextVar[asyncio.Event | None] = contextvars.ContextVar("_cancel_event_var", default=None)
logger = logging.getLogger("osint.claude_client")
class ClaudeCliError(RuntimeError):
"""Strukturierter Fehler aus dem Claude CLI mit Kategorie.
error_type:
- "rate_limit": Anthropic Rate-Limit oder Overload (transient, retry-tauglich)
- "auth_error": Account-Problem (Organisation hat keinen Claude-Zugang,
Token abgelaufen/ungueltig) - kein Retry sinnvoll, Admin-Aktion noetig
- "timeout": Claude CLI Timeout (transient)
- "cli_error": Sonstiger CLI-Fehler (unspezifisch, Default)
"""
def __init__(self, error_type: str, message: str):
self.error_type = error_type
self.message = message
super().__init__(f"Claude CLI [{error_type}]: {message}")
def _classify_cli_error(combined_output: str) -> str:
"""Ordnet einer Fehler-Ausgabe eine error_type-Kategorie zu."""
txt = combined_output.lower()
rate_limit_keywords = ["hit your limit", "rate limit", "resets", "rate_limit", "overloaded"]
auth_error_keywords = ["does not have access", "login again", "contact your administrator"]
if any(kw in txt for kw in rate_limit_keywords):
return "rate_limit"
if any(kw in txt for kw in auth_error_keywords):
return "auth_error"
return "cli_error"
@dataclass
class ClaudeUsage:
"""Token-Verbrauch eines einzelnen Claude CLI Aufrufs."""
input_tokens: int = 0
output_tokens: int = 0
cache_creation_tokens: int = 0
cache_read_tokens: int = 0
cost_usd: float = 0.0
duration_ms: int = 0
@dataclass
class UsageAccumulator:
"""Akkumuliert Usage über mehrere Claude-Aufrufe eines Refreshs."""
input_tokens: int = 0
output_tokens: int = 0
cache_creation_tokens: int = 0
cache_read_tokens: int = 0
total_cost_usd: float = 0.0
call_count: int = 0
def add(self, usage: ClaudeUsage):
self.input_tokens += usage.input_tokens
self.output_tokens += usage.output_tokens
self.cache_creation_tokens += usage.cache_creation_tokens
self.cache_read_tokens += usage.cache_read_tokens
self.total_cost_usd += usage.cost_usd
self.call_count += 1
def _sanitize_mdash(text: str) -> str:
"""Ersetzt Gedankenstriche durch Bindestriche (KI-Indikator reduzieren)."""
return text.replace("\u2014", " - ").replace("\u2013", " - ")
async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch", model: str | None = None, raw_text: bool = False, timeout: float | None = None) -> tuple[str, ClaudeUsage]:
"""Ruft Claude CLI auf. Gibt (result_text, usage) zurück.
Prompt wird via stdin uebergeben um OS ARG_MAX Limits zu vermeiden.
Args:
prompt: Der Prompt fuer Claude
tools: Kommagetrennte erlaubte Tools (None = keine Tools, --max-turns 1)
model: Optionales Modell (z.B. CLAUDE_MODEL_FAST fuer Haiku). None = CLAUDE_MODEL_STANDARD (Opus 4.7).
timeout: Override in Sekunden. None = Fallback auf globalen CLAUDE_TIMEOUT (1800s).
"""
effective_model = model or CLAUDE_MODEL_STANDARD
effective_timeout = timeout if timeout is not None else CLAUDE_TIMEOUT
cmd = [CLAUDE_PATH, "-p", "-", "--output-format", "json", "--model", effective_model]
if tools:
cmd.extend(["--allowedTools", tools])
else:
cmd.extend(["--max-turns", "1", "--allowedTools", ""])
if not raw_text:
cmd.extend(["--append-system-prompt",
"CRITICAL: You are a JSON-only output agent. "
"Output EXCLUSIVELY a single valid JSON object. "
"No explanatory text, no markdown fences, no continuation of previous responses. "
"Start your response with { and end with }."])
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
env={
"PATH": "/usr/local/bin:/usr/bin:/bin",
"HOME": "/home/claude-dev",
"LANG": "C.UTF-8",
"LC_ALL": "C.UTF-8",
},
)
try:
cancel_event = _cancel_event_var.get(None)
if cancel_event:
# Cancel-aware: Monitor cancel_event while process runs
communicate_task = asyncio.create_task(
process.communicate(input=prompt.encode("utf-8"))
)
cancel_wait_task = asyncio.create_task(cancel_event.wait())
timeout_task = asyncio.create_task(asyncio.sleep(effective_timeout))
done, pending = await asyncio.wait(
[communicate_task, cancel_wait_task, timeout_task],
return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
if communicate_task in done:
stdout, stderr = communicate_task.result()
elif cancel_wait_task in done:
process.kill()
await process.wait()
raise asyncio.CancelledError("Cancel angefordert")
else:
process.kill()
await process.wait()
raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s")
else:
stdout, stderr = await asyncio.wait_for(
process.communicate(input=prompt.encode("utf-8")), timeout=effective_timeout
)
except asyncio.TimeoutError:
process.kill()
raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s")
if process.returncode != 0:
error_msg = stderr.decode("utf-8", errors="replace").strip()
stdout_msg = stdout.decode("utf-8", errors="replace").strip()
# Rate-Limit/Auth-Fehler kommen teils als JSON auf stdout, nicht auf stderr
combined_output = f"{error_msg} {stdout_msg}"
error_type = _classify_cli_error(combined_output)
if error_type == "rate_limit":
logger.warning(f"Claude CLI Rate-Limit (Exit {process.returncode}): {stdout_msg or error_msg}")
elif error_type == "auth_error":
logger.error(f"Claude CLI Auth-Fehler (Exit {process.returncode}): {stdout_msg or error_msg}")
else:
logger.error(f"Claude CLI Fehler (Exit {process.returncode}): {error_msg}")
if stdout_msg:
logger.error(f"Claude CLI stdout bei Fehler: {stdout_msg[:500]}")
raise ClaudeCliError(error_type, stdout_msg or error_msg)
raw = stdout.decode("utf-8", errors="replace").strip()
usage = ClaudeUsage()
result_text = raw
try:
data = json.loads(raw)
# CLI kann returncode=0 liefern und trotzdem is_error=true setzen
# (z.B. "Your organization does not have access to Claude")
if data.get("is_error"):
error_text = str(data.get("result", ""))
error_type = _classify_cli_error(error_text)
if error_type == "rate_limit":
logger.warning(f"Claude CLI Rate-Limit (is_error): {error_text}")
elif error_type == "auth_error":
logger.error(f"Claude CLI Auth-Fehler (is_error): {error_text}")
else:
logger.error(f"Claude CLI Fehler (is_error): {error_text}")
raise ClaudeCliError(error_type, error_text)
result_text = data.get("result", raw)
u = data.get("usage", {})
usage = ClaudeUsage(
input_tokens=u.get("input_tokens", 0),
output_tokens=u.get("output_tokens", 0),
cache_creation_tokens=u.get("cache_creation_input_tokens", 0),
cache_read_tokens=u.get("cache_read_input_tokens", 0),
cost_usd=data.get("total_cost_usd", 0.0),
duration_ms=data.get("duration_ms", 0),
)
model_info = f" [{model}]" if model else ""
logger.info(
f"Claude{model_info}: {usage.input_tokens} in / {usage.output_tokens} out / "
f"cache {usage.cache_creation_tokens}+{usage.cache_read_tokens} / "
f"${usage.cost_usd:.4f} / {usage.duration_ms}ms"
)
except json.JSONDecodeError:
logger.warning("Claude CLI Antwort kein gültiges JSON, nutze raw output")
result_text = _sanitize_mdash(result_text)
return result_text, usage

Datei anzeigen

Datei anzeigen

@@ -0,0 +1,282 @@
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
import asyncio
import logging
import json
from urllib.parse import urlparse
import httpx
import feedparser
import aiosqlite
logger = logging.getLogger("osint.source_health")
async def run_health_checks(db: aiosqlite.Connection) -> dict:
"""Führt alle Health-Checks für aktive Grundquellen durch."""
logger.info("Starte Quellen-Health-Check...")
# Alle aktiven Grundquellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
)
sources = [dict(row) for row in await cursor.fetchall()]
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben)
await db.execute("DELETE FROM source_health_checks")
await db.commit()
checks_done = 0
issues_found = 0
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
sources_with_url = [s for s in sources if s["url"]]
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
for i in range(0, len(sources_with_url), 5):
batch = sources_with_url[i:i + 5]
tasks = [_check_source_reachability(client, s) for s in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for source, result in zip(batch, results):
if isinstance(result, Exception):
await _save_check(
db, source["id"], "reachability", "error",
f"Prüfung fehlgeschlagen: {result}",
)
issues_found += 1
else:
for check in result:
await _save_check(
db, source["id"], check["type"], check["status"],
check["message"], check.get("details"),
)
if check["status"] != "ok":
issues_found += 1
checks_done += 1
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
for source in sources:
if source["source_type"] in ("excluded", "web_source"):
continue
stale_check = _check_stale(source)
if stale_check:
await _save_check(
db, source["id"], stale_check["type"],
stale_check["status"], stale_check["message"],
)
if stale_check["status"] != "ok":
issues_found += 1
# 3. Duplikate erkennen
duplicates = _find_duplicates(sources)
for dup in duplicates:
await _save_check(
db, dup["source_id"], "duplicate", "warning",
dup["message"], json.dumps(dup.get("details", {})),
)
issues_found += 1
await db.commit()
logger.info(
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
f"{issues_found} Probleme gefunden"
)
return {"checked": checks_done, "issues": issues_found}
async def _check_source_reachability(
client: httpx.AsyncClient, source: dict,
) -> list[dict]:
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
checks = []
url = source["url"]
try:
resp = await client.get(url)
if resp.status_code >= 400:
checks.append({
"type": "reachability",
"status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar",
"details": json.dumps({"status_code": resp.status_code, "url": url}),
})
return checks
if resp.status_code >= 300:
checks.append({
"type": "reachability",
"status": "warning",
"message": f"HTTP {resp.status_code} - Weiterleitung",
"details": json.dumps({
"status_code": resp.status_code,
"final_url": str(resp.url),
}),
})
else:
checks.append({
"type": "reachability",
"status": "ok",
"message": "Erreichbar",
})
# Feed-Validität nur für RSS-Feeds
if source["source_type"] == "rss_feed":
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Kein gültiger RSS/Atom-Feed",
})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Feed fehlerhaft (bozo)",
"details": json.dumps({
"bozo_exception": str(feed.get("bozo_exception", "")),
}),
})
elif not feed.entries:
checks.append({
"type": "feed_validity",
"status": "warning",
"message": "Feed erreichbar aber leer",
})
else:
checks.append({
"type": "feed_validity",
"status": "ok",
"message": f"Feed gültig ({len(feed.entries)} Einträge)",
})
except httpx.TimeoutException:
checks.append({
"type": "reachability",
"status": "error",
"message": "Timeout (15s)",
})
except httpx.ConnectError as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"Verbindung fehlgeschlagen: {e}",
})
except Exception as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"{type(e).__name__}: {e}",
})
return checks
def _check_stale(source: dict) -> dict | None:
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
if source["source_type"] == "excluded":
return None
article_count = source.get("article_count") or 0
last_seen = source.get("last_seen_at")
if article_count == 0:
return {
"type": "stale",
"status": "warning",
"message": "Noch nie Artikel geliefert",
}
if last_seen:
try:
from datetime import datetime
last_dt = datetime.fromisoformat(last_seen)
now = datetime.now()
age_days = (now - last_dt).days
if age_days > 30:
return {
"type": "stale",
"status": "warning",
"message": f"Letzter Artikel vor {age_days} Tagen",
}
except (ValueError, TypeError):
pass
return None
def _find_duplicates(sources: list[dict]) -> list[dict]:
"""Findet doppelte Quellen (gleiche URL)."""
duplicates = []
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
duplicates.append({
"source_id": s["id"],
"message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})",
"details": {"duplicate_of": existing["id"], "type": "url"},
})
else:
url_map[url_norm] = s
return duplicates
async def _save_check(
db: aiosqlite.Connection, source_id: int, check_type: str,
status: str, message: str, details: str = None,
):
"""Speichert ein Health-Check-Ergebnis."""
await db.execute(
"INSERT INTO source_health_checks "
"(source_id, check_type, status, message, details) "
"VALUES (?, ?, ?, ?, ?)",
(source_id, check_type, status, message, details),
)
async def get_health_summary(db: aiosqlite.Connection) -> dict:
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute("""
SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
h.check_type, h.status, h.message, h.details, h.checked_at
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
ORDER BY
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
s.name
""")
checks = [dict(row) for row in await cursor.fetchall()]
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute(
"SELECT MAX(checked_at) as last_check FROM source_health_checks"
)
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}

Datei anzeigen

@@ -0,0 +1,277 @@
"""KI-gestützte Quellen-Vorschläge via Haiku."""
import json
import logging
import re
import aiosqlite
from shared.agents.claude_client import call_claude
from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.source_suggester")
async def generate_suggestions(db: aiosqlite.Connection) -> int:
"""Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse."""
logger.info("Starte Quellen-Vorschläge via Haiku...")
# 1. Aktuelle Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, category, status, "
"article_count, last_seen_at "
"FROM sources WHERE tenant_id IS NULL ORDER BY category, name"
)
sources = [dict(row) for row in await cursor.fetchall()]
# 2. Health-Check-Probleme laden
cursor = await db.execute("""
SELECT h.source_id, s.name, s.domain, s.url,
h.check_type, h.status, h.message
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
WHERE h.status IN ('error', 'warning')
""")
issues = [dict(row) for row in await cursor.fetchall()]
# 3. Alte pending-Vorschläge entfernen (älter als 30 Tage)
await db.execute(
"DELETE FROM source_suggestions "
"WHERE status = 'pending' AND created_at < datetime('now', '-30 days')"
)
# 4. Quellen-Zusammenfassung für Haiku
categories = {}
for s in sources:
cat = s["category"]
if cat not in categories:
categories[cat] = []
categories[cat].append(s)
source_summary = ""
for cat, cat_sources in sorted(categories.items()):
active = [
s for s in cat_sources
if s["status"] == "active" and s["source_type"] != "excluded"
]
source_summary += f"\n{cat} ({len(active)} aktiv): "
source_summary += ", ".join(s["name"] for s in active[:10])
if len(active) > 10:
source_summary += f" ... (+{len(active) - 10} weitere)"
issues_summary = ""
if issues:
issues_summary = "\n\nProbleme gefunden:\n"
for issue in issues[:20]:
issues_summary += (
f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): "
f"{issue['check_type']} = {issue['status']} - {issue['message']}\n"
)
prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden.
Aktuelle Quellensammlung:{source_summary}{issues_summary}
Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor.
Beachte:
1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste
2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor
3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen
4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind
5. Maximal 5 Vorschläge
Antworte NUR mit einem JSON-Array. Jedes Element:
{{
"type": "add_source|deactivate_source|fix_url|remove_source",
"title": "Kurzer Titel",
"description": "Begründung",
"priority": "low|medium|high",
"source_id": null,
"data": {{
"name": "Anzeigename",
"url": "https://...",
"domain": "example.de",
"category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige"
}}
}}
Nur das JSON-Array, kein anderer Text."""
try:
response, usage = await call_claude(
prompt, tools=None, model=CLAUDE_MODEL_FAST,
)
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match:
logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)")
return 0
suggestions = json.loads(json_match.group(0))
count = 0
for suggestion in suggestions[:5]:
stype = suggestion.get("type", "add_source")
title = suggestion.get("title", "")
desc = suggestion.get("description", "")
priority = suggestion.get("priority", "medium")
source_id = suggestion.get("source_id")
data = json.dumps(
suggestion.get("data", {}), ensure_ascii=False,
)
# source_id validieren (muss existieren oder None sein)
if source_id is not None:
cursor = await db.execute(
"SELECT id FROM sources WHERE id = ?", (source_id,),
)
if not await cursor.fetchone():
source_id = None
# Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending?
if source_id is not None:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
(stype, source_id),
)
else:
# Bei add_source ohne source_id: Domain aus suggested_data prüfen
check_domain = suggestion.get('data', {}).get('domain', '')
if check_domain:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'",
(stype, f'%{check_domain}%'),
)
else:
cursor = await db.execute(
"SELECT id FROM source_suggestions "
"WHERE title = ? AND status = 'pending'",
(title,),
)
if await cursor.fetchone():
continue
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, "
"suggested_data, priority, status) "
"VALUES (?, ?, ?, ?, ?, ?, 'pending')",
(stype, title, desc, source_id, data, priority),
)
count += 1
await db.commit()
logger.info(
f"Quellen-Vorschläge: {count} neue Vorschläge generiert "
f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / "
f"${usage.cost_usd:.4f})"
)
return count
except Exception as e:
logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True)
return 0
async def apply_suggestion(
db: aiosqlite.Connection, suggestion_id: int, accept: bool,
) -> dict:
"""Wendet einen Vorschlag an oder lehnt ihn ab."""
cursor = await db.execute(
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,),
)
suggestion = await cursor.fetchone()
if not suggestion:
raise ValueError("Vorschlag nicht gefunden")
suggestion = dict(suggestion)
if suggestion["status"] != "pending":
raise ValueError(f"Vorschlag bereits {suggestion['status']}")
new_status = "accepted" if accept else "rejected"
result = {"status": new_status, "action": None}
if accept:
stype = suggestion["suggestion_type"]
data = (
json.loads(suggestion["suggested_data"])
if suggestion["suggested_data"]
else {}
)
if stype == "add_source":
name = data.get("name", "Unbenannt")
url = data.get("url")
domain = data.get("domain", "")
category = data.get("category", "sonstige")
source_type = "rss_feed" if url and any(
x in (url or "").lower()
for x in ("rss", "feed", "xml", "atom")
) else "web_source"
if url:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
(url,),
)
if await cursor.fetchone():
result["action"] = "übersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
"INSERT INTO sources "
"(name, url, domain, source_type, category, status, "
"added_by, tenant_id) "
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
(name, url, domain, source_type, category),
)
result["action"] = f"Quelle '{name}' angelegt"
else:
result["action"] = "übersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute(
"UPDATE sources SET status = 'inactive' WHERE id = ?",
(source_id,),
)
result["action"] = "Quelle deaktiviert"
else:
result["action"] = "übersprungen (keine source_id)"
elif stype == "remove_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute(
"DELETE FROM sources WHERE id = ?", (source_id,),
)
result["action"] = "Quelle gelöscht"
else:
result["action"] = "übersprungen (keine source_id)"
elif stype == "fix_url":
source_id = suggestion["source_id"]
new_url = data.get("url")
if source_id and new_url:
await db.execute(
"UPDATE sources SET url = ? WHERE id = ?",
(new_url, source_id),
)
result["action"] = f"URL aktualisiert auf {new_url}"
else:
result["action"] = "übersprungen (keine source_id oder URL)"
await db.execute(
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP "
"WHERE id = ?",
(new_status, suggestion_id),
)
await db.commit()
result["status"] = new_status
return result

742
src/shared/source_rules.py Normale Datei
Datei anzeigen

@@ -0,0 +1,742 @@
"""Dynamische Quellen-Regeln aus der Datenbank."""
import logging
import re
import json
import asyncio
from urllib.parse import urlparse
import httpx
import feedparser
import hashlib
from config import CLAUDE_PATH, CLAUDE_TIMEOUT, MAX_FEEDS_PER_DOMAIN
logger = logging.getLogger("osint.source_rules")
# Domain -> Kategorie Mapping für Auto-Erkennung
DOMAIN_CATEGORY_MAP = {
# Nachrichtenagenturen
"reuters.com": "nachrichtenagentur",
"apnews.com": "nachrichtenagentur",
"dpa.com": "nachrichtenagentur",
"afp.com": "nachrichtenagentur",
# Öffentlich-Rechtlich
"tagesschau.de": "oeffentlich-rechtlich",
"zdf.de": "oeffentlich-rechtlich",
"dw.com": "oeffentlich-rechtlich",
"br.de": "oeffentlich-rechtlich",
"ndr.de": "oeffentlich-rechtlich",
"wdr.de": "oeffentlich-rechtlich",
"mdr.de": "oeffentlich-rechtlich",
"swr.de": "oeffentlich-rechtlich",
"hr.de": "oeffentlich-rechtlich",
"rbb24.de": "oeffentlich-rechtlich",
"ard.de": "oeffentlich-rechtlich",
"orf.at": "oeffentlich-rechtlich",
"srf.ch": "oeffentlich-rechtlich",
# Qualitätszeitungen
"spiegel.de": "qualitaetszeitung",
"zeit.de": "qualitaetszeitung",
"faz.net": "qualitaetszeitung",
"sueddeutsche.de": "qualitaetszeitung",
"nzz.ch": "qualitaetszeitung",
"welt.de": "qualitaetszeitung",
"tagesspiegel.de": "qualitaetszeitung",
"fr.de": "qualitaetszeitung",
"stern.de": "qualitaetszeitung",
"focus.de": "qualitaetszeitung",
# Behörden
"bmi.bund.de": "behoerde",
"europol.europa.eu": "behoerde",
"bka.de": "behoerde",
"bsi.bund.de": "behoerde",
"verfassungsschutz.de": "behoerde",
"bpb.de": "behoerde",
# Fachmedien
"netzpolitik.org": "fachmedien",
"handelsblatt.com": "fachmedien",
"heise.de": "fachmedien",
"golem.de": "fachmedien",
"t3n.de": "fachmedien",
"wiwo.de": "fachmedien",
# Think Tanks
"swp-berlin.org": "think-tank",
"iiss.org": "think-tank",
"brookings.edu": "think-tank",
"rand.org": "think-tank",
"dgap.org": "think-tank",
"chathamhouse.org": "think-tank",
# International
"bbc.co.uk": "international",
"bbc.com": "international",
"aljazeera.com": "international",
"france24.com": "international",
"cnn.com": "international",
"theguardian.com": "international",
"nytimes.com": "international",
"washingtonpost.com": "international",
"lemonde.fr": "international",
"elpais.com": "international",
# Regional
"berliner-zeitung.de": "regional",
"hamburger-abendblatt.de": "regional",
"stuttgarter-zeitung.de": "regional",
"ksta.de": "regional",
"rp-online.de": "regional",
"merkur.de": "regional",
# Telegram
"t.me": "telegram",
}
# Bekannte Feed-Pfade zum Durchprobieren
_FEED_PATHS = ["/feed", "/rss", "/rss.xml", "/atom.xml", "/feed.xml", "/index.xml", "/feed/rss", "/feeds/posts/default"]
# Erweiterte nachrichtenspezifische Feed-Pfade für Multi-Discovery
_NEWS_FEED_PATHS = [
"/world/rss", "/world/rss.xml", "/world/feed",
"/politics/rss", "/politics/rss.xml", "/politics/feed",
"/business/rss", "/business/rss.xml", "/business/feed",
"/technology/rss", "/technology/rss.xml", "/technology/feed",
"/environment/rss", "/environment/rss.xml", "/environment/feed",
"/science/rss", "/science/rss.xml", "/science/feed",
"/europe/rss", "/europe/rss.xml", "/europe/feed",
"/security/rss", "/security/rss.xml", "/security/feed",
"/international/rss", "/international/rss.xml", "/international/feed",
"/economy/rss", "/economy/rss.xml", "/economy/feed",
"/defence/rss", "/defence/rss.xml", "/defence/feed",
"/middle-east/rss", "/middle-east/rss.xml",
"/asia/rss", "/asia/rss.xml",
"/africa/rss", "/africa/rss.xml",
"/americas/rss", "/americas/rss.xml",
"/uk-news/rss", "/us-news/rss",
"/commentisfree/rss", "/opinion/rss",
"/law/rss", "/media/rss",
"/global-development/rss",
"/news/feed", "/news/rss", "/news/rss.xml",
"/politik/rss", "/politik/rss.xml",
"/wirtschaft/rss", "/wirtschaft/rss.xml",
"/panorama/rss", "/panorama/rss.xml",
"/wissen/rss", "/wissen/rss.xml",
"/ausland/rss", "/ausland/rss.xml",
"/inland/rss", "/inland/rss.xml",
"/netzwelt/rss", "/netzwelt/rss.xml",
"/kultur/rss", "/kultur/rss.xml",
]
# Bekannte Feed-Subdomains für Portale die Feeds auf separater Domain hosten
_DOMAIN_FEED_URLS = {
"bbc.com": [
"https://feeds.bbci.co.uk/news/rss.xml",
"https://feeds.bbci.co.uk/news/world/rss.xml",
"https://feeds.bbci.co.uk/news/business/rss.xml",
"https://feeds.bbci.co.uk/news/politics/rss.xml",
"https://feeds.bbci.co.uk/news/technology/rss.xml",
"https://feeds.bbci.co.uk/news/science_and_environment/rss.xml",
"https://feeds.bbci.co.uk/news/health/rss.xml",
"https://feeds.bbci.co.uk/news/education/rss.xml",
"https://feeds.bbci.co.uk/news/world/middle_east/rss.xml",
"https://feeds.bbci.co.uk/news/world/europe/rss.xml",
"https://feeds.bbci.co.uk/news/world/africa/rss.xml",
"https://feeds.bbci.co.uk/news/world/asia/rss.xml",
"https://feeds.bbci.co.uk/news/world/us_and_canada/rss.xml",
"https://feeds.bbci.co.uk/news/world/latin_america/rss.xml",
"https://feeds.bbci.co.uk/news/entertainment_and_arts/rss.xml",
],
"bbc.co.uk": "bbc.com", # Alias
"reuters.com": [
"https://www.reutersagency.com/feed/",
],
"aljazeera.com": [
"https://www.aljazeera.com/xml/rss/all.xml",
],
}
def _get_extra_feed_urls(domain: str) -> list[str]:
"""Gibt bekannte Feed-URLs für Domains mit separater Feed-Subdomain zurück."""
entry = _DOMAIN_FEED_URLS.get(domain)
if isinstance(entry, str):
# Alias — auf andere Domain verweisen
entry = _DOMAIN_FEED_URLS.get(entry)
if isinstance(entry, list):
return entry
return []
def _normalize_url(url: str) -> str:
"""URL normalisieren (https:// ergänzen falls fehlend)."""
url = url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
return url
# Subdomain → kanonische Domain Zuordnung
_DOMAIN_ALIASES = {
"feeds.bbci.co.uk": "bbc.com",
"rss.sueddeutsche.de": "sueddeutsche.de",
"on.orf.at": "orf.at",
"rss.orf.at": "orf.at",
"rss.dw.com": "dw.com",
"newsfeed.zeit.de": "zeit.de",
"reutersagency.com": "reuters.com",
"edition.cnn.com": "cnn.com",
"rsshub.app": "apnews.com",
}
def _extract_domain(url: str) -> str:
"""Domain aus URL extrahieren (ohne www., mit Alias-Normalisierung)."""
parsed = urlparse(url)
domain = parsed.hostname or ""
if domain.startswith("www."):
domain = domain[4:]
return _DOMAIN_ALIASES.get(domain, domain)
def _detect_category(domain: str) -> str:
"""Kategorie anhand der Domain erkennen."""
if domain in DOMAIN_CATEGORY_MAP:
return DOMAIN_CATEGORY_MAP[domain]
# Subdomain-Match: z.B. feeds.reuters.com -> reuters.com
parts = domain.split(".")
if len(parts) > 2:
parent = ".".join(parts[-2:])
if parent in DOMAIN_CATEGORY_MAP:
return DOMAIN_CATEGORY_MAP[parent]
return "sonstige"
# Bekannte Domain → Anzeigename Zuordnungen
DOMAIN_DISPLAY_NAMES = {
"tagesschau.de": "tagesschau",
"zdf.de": "ZDF heute",
"spiegel.de": "Spiegel",
"zeit.de": "Zeit",
"newsfeed.zeit.de": "Zeit",
"faz.net": "FAZ",
"sueddeutsche.de": "Süddeutsche Zeitung",
"rss.sueddeutsche.de": "Süddeutsche Zeitung",
"nzz.ch": "NZZ",
"dw.com": "Deutsche Welle",
"rss.dw.com": "Deutsche Welle",
"reuters.com": "Reuters",
"reutersagency.com": "Reuters",
"rsshub.app": "RSSHub",
"apnews.com": "AP News",
"bbc.com": "BBC",
"bbc.co.uk": "BBC",
"feeds.bbci.co.uk": "BBC",
"aljazeera.com": "Al Jazeera",
"france24.com": "France24",
"theguardian.com": "The Guardian",
"nytimes.com": "New York Times",
"washingtonpost.com": "Washington Post",
"cnn.com": "CNN",
"bmi.bund.de": "BMI",
"europol.europa.eu": "Europol",
"handelsblatt.com": "Handelsblatt",
"wiwo.de": "WirtschaftsWoche",
"heise.de": "Heise Online",
"golem.de": "Golem",
"netzpolitik.org": "netzpolitik.org",
"t3n.de": "t3n",
"welt.de": "Welt",
"tagesspiegel.de": "Tagesspiegel",
"stern.de": "Stern",
"focus.de": "Focus",
"n-tv.de": "n-tv",
"bild.de": "BILD",
"tarnkappe.info": "Tarnkappe",
"bleepingcomputer.com": "BleepingComputer",
"techcrunch.com": "TechCrunch",
"theverge.com": "The Verge",
"wired.com": "WIRED",
"tomshardware.com": "Tom's Hardware",
"finanzen.net": "Finanzen.net",
"404media.co": "404 Media",
"medium.com": "Medium",
"swp-berlin.org": "SWP Berlin",
"dgap.org": "DGAP",
"brookings.edu": "Brookings",
"rand.org": "RAND",
"lemonde.fr": "Le Monde",
"elpais.com": "El País",
"orf.at": "ORF",
"srf.ch": "SRF",
"br.de": "BR",
"ndr.de": "NDR",
"wdr.de": "WDR",
"mdr.de": "MDR",
"swr.de": "SWR",
"hr.de": "hr",
"rbb24.de": "rbb24",
"fr.de": "Frankfurter Rundschau",
"rp-online.de": "Rheinische Post",
"ksta.de": "Kölner Stadt-Anzeiger",
"berliner-zeitung.de": "Berliner Zeitung",
"stuttgarter-zeitung.de": "Stuttgarter Zeitung",
"hamburger-abendblatt.de": "Hamburger Abendblatt",
"merkur.de": "Münchner Merkur",
"bsi.bund.de": "BSI",
"bpb.de": "bpb",
"bka.de": "BKA",
"verfassungsschutz.de": "Verfassungsschutz",
"bashinho.de": "Bashinho",
}
def domain_to_display_name(domain: str) -> str:
"""Wandelt eine Domain in einen lesbaren Anzeigenamen um.
Prüft erst die bekannte Zuordnung, dann leitet einen sinnvollen
Namen aus der Domain ab (erster Teil, kapitalisiert).
"""
if domain in DOMAIN_DISPLAY_NAMES:
return DOMAIN_DISPLAY_NAMES[domain]
# Subdomain-Match: feeds.reuters.com -> reuters.com
parts = domain.split(".")
if len(parts) > 2:
parent = ".".join(parts[-2:])
if parent in DOMAIN_DISPLAY_NAMES:
return DOMAIN_DISPLAY_NAMES[parent]
# Fallback: Domain-Kern extrahieren und kapitalisieren
# z.B. "example-news.de" → "Example News"
core = parts[-2] if len(parts) >= 2 else parts[0]
return core.replace("-", " ").title()
def _compute_content_hash(entries: list) -> str:
"""Berechnet einen Fingerprint aus den ersten 5 Entry-Titeln eines Feeds."""
titles = [e.get("title", "") for e in entries[:5]]
combined = "|".join(titles).strip()
if not combined:
return ""
return hashlib.sha256(combined.encode("utf-8")).hexdigest()[:16]
async def _validate_feed(client: httpx.AsyncClient, url: str) -> dict | None:
"""Prüft ob eine URL ein gültiger RSS/Atom-Feed ist. Gibt Feed-Info zurück oder None."""
try:
resp = await client.get(url)
if resp.status_code != 200:
return None
content_type = resp.headers.get("content-type", "")
text = resp.text[:10000] # Nur Anfang prüfen
# Muss XML-artig sein
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
return None
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
return None
if feed.feed.get("title") or feed.entries:
content_hash = _compute_content_hash(feed.entries)
return {
"url": str(resp.url), # Finale URL nach Redirects
"title": feed.feed.get("title", ""),
"content_hash": content_hash,
}
except Exception:
pass
return None
async def discover_source(url: str) -> dict:
"""Erkennt RSS-Feed, Name, Domain und Kategorie einer URL automatisch.
Returns:
dict mit: name, domain, rss_url, category, source_type
"""
url = _normalize_url(url)
domain = _extract_domain(url)
category = _detect_category(domain)
result = {
"name": domain_to_display_name(domain),
"domain": domain,
"rss_url": None,
"category": category,
"source_type": "web_source",
}
async with httpx.AsyncClient(
timeout=12.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
# 1. Seite abrufen und nach RSS-Links suchen
page_title = None
try:
resp = await client.get(url)
if resp.status_code == 200:
html = resp.text[:50000]
# <title> extrahieren
title_match = re.search(r"<title[^>]*>([^<]+)</title>", html, re.IGNORECASE)
if title_match:
page_title = title_match.group(1).strip()
# RSS/Atom Link-Tags suchen
feed_links = re.findall(
r'<link[^>]+type=["\']application/(rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
)
# Auch umgekehrte Attribut-Reihenfolge
feed_links += re.findall(
r'<link[^>]+href=["\']([^"\']+)["\'][^>]+type=["\']application/(rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
)
# href aus den gefundenen Tags extrahieren
feed_urls = []
for tag in re.finditer(
r'<link[^>]+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
):
href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0))
if href_match:
href = href_match.group(1)
# Relative URLs auflösen
if href.startswith("/"):
parsed = urlparse(url)
href = f"{parsed.scheme}://{parsed.netloc}{href}"
elif not href.startswith("http"):
href = url.rstrip("/") + "/" + href
feed_urls.append(href)
# Gefundene Feed-URLs validieren
for feed_url in feed_urls:
feed_info = await _validate_feed(client, feed_url)
if feed_info:
result["rss_url"] = feed_info["url"]
result["source_type"] = "rss_feed"
if feed_info["title"]:
result["name"] = feed_info["title"]
elif page_title:
result["name"] = page_title
return result
except Exception as e:
logger.debug(f"Fehler beim Abrufen von {url}: {e}")
# 2. Bekannte Feed-Pfade durchprobieren
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in _FEED_PATHS:
feed_url = base_url + path
feed_info = await _validate_feed(client, feed_url)
if feed_info:
result["rss_url"] = feed_info["url"]
result["source_type"] = "rss_feed"
if feed_info["title"]:
result["name"] = feed_info["title"]
elif page_title:
result["name"] = page_title
return result
# Kein Feed gefunden — Name aus Seitentitel
if page_title:
result["name"] = page_title
return result
async def discover_all_feeds(url: str) -> dict:
"""Findet ALLE RSS/Atom-Feeds einer Domain.
Returns:
dict mit: domain, category, page_title, feeds: [{"url", "title"}, ...]
"""
url = _normalize_url(url)
domain = _extract_domain(url)
category = _detect_category(domain)
result = {
"domain": domain,
"category": category,
"page_title": None,
"feeds": [],
}
seen_urls = set()
seen_content_hashes = set()
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
# 1. HTML-Seite abrufen und ALLE RSS-Link-Tags sammeln
candidate_urls = []
try:
resp = await client.get(url)
if resp.status_code == 200:
html = resp.text[:100000]
title_match = re.search(r"<title[^>]*>([^<]+)</title>", html, re.IGNORECASE)
if title_match:
result["page_title"] = title_match.group(1).strip()
parsed = urlparse(url)
base = f"{parsed.scheme}://{parsed.netloc}"
for tag in re.finditer(
r'<link[^>]+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
):
href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0))
if href_match:
href = href_match.group(1)
if href.startswith("/"):
href = base + href
elif not href.startswith("http"):
href = url.rstrip("/") + "/" + href
candidate_urls.append(href)
except Exception as e:
logger.debug(f"Fehler beim Abrufen von {url}: {e}")
# 2. Bekannte Feed-Pfade hinzufügen (Standard + Nachrichten-spezifisch)
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in _FEED_PATHS + _NEWS_FEED_PATHS:
candidate_urls.append(base_url + path)
# 2b. Bekannte Feed-URLs für Domains mit separater Feed-Subdomain (z.B. BBC)
extra_urls = _get_extra_feed_urls(domain)
candidate_urls.extend(extra_urls)
# 3. Alle Kandidaten parallel validieren (in Batches von 10)
async def _validate_and_collect(feed_url: str):
try:
return await _validate_feed(client, feed_url)
except Exception:
return None
for i in range(0, len(candidate_urls), 10):
batch = candidate_urls[i:i + 10]
results = await asyncio.gather(*[_validate_and_collect(u) for u in batch])
for feed_info in results:
if not feed_info:
continue
if feed_info["url"] in seen_urls:
continue
# Content-Hash Duplikat-Erkennung (gleicher Inhalt = WordPress-Redirect etc.)
content_hash = feed_info.get("content_hash", "")
if content_hash and content_hash in seen_content_hashes:
logger.debug(f"Content-Hash Duplikat übersprungen: {feed_info['url']}")
continue
seen_urls.add(feed_info["url"])
if content_hash:
seen_content_hashes.add(content_hash)
result["feeds"].append(feed_info)
logger.info(f"discover_all_feeds({domain}): {len(result['feeds'])} Feeds gefunden")
return result
async def evaluate_feeds_with_claude(domain: str, feeds: list[dict]) -> list[dict]:
"""Lässt Claude die OSINT-Relevanz der Feeds bewerten.
Args:
domain: Domain-Name
feeds: Liste von {"url", "title"} Dicts
Returns:
Liste von {"url", "title", "name"} Dicts (nur relevante Feeds)
"""
if not feeds:
return []
feed_list = "\n".join(
f" {i+1}. {f['title'] or f['url']}{f['url']}"
for i, f in enumerate(feeds)
)
prompt = f"""Du bist ein OSINT-Analyst. Bewerte diese RSS-Feeds der Domain "{domain}" nach OSINT-Relevanz.
OSINT-relevante Themen: Politik, Sicherheit, Wirtschaft, Internationale Beziehungen, Verteidigung, Konflikte, Terrorismus, Cybersecurity, Umweltkatastrophen, Technologie, Wissenschaft, Nachrichten allgemein.
NICHT relevant: Sport, Lifestyle, Rezepte, Unterhaltung, Reisen, Mode, Kultur/Kunst, Wetter, Kreuzworträtsel, Podcasts (allgemein), Leserbriefe, Kommentare/Meinung.
Feeds:
{feed_list}
Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element:
{{"index": <1-basiert>, "relevant": true/false, "name": "<Anzeigename für OSINT-Monitor, z.B. 'Guardian World' oder 'Spiegel Politik'>"}}
Nur das JSON-Array, kein anderer Text."""
try:
cmd = [
CLAUDE_PATH,
"-p", prompt,
"--output-format", "text",
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={"PATH": "/usr/local/bin:/usr/bin:/bin", "HOME": "/home/claude-dev"},
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=min(CLAUDE_TIMEOUT, 120)
)
except asyncio.TimeoutError:
process.kill()
logger.warning(f"Claude-Bewertung Timeout für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
if process.returncode != 0:
logger.warning(f"Claude-Bewertung fehlgeschlagen für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
response = stdout.decode("utf-8", errors="replace").strip()
# JSON aus Antwort extrahieren (Claude gibt manchmal Markdown-Blöcke zurück)
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match:
logger.warning(f"Kein JSON in Claude-Antwort für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
evaluations = json.loads(json_match.group(0))
relevant = []
for ev in evaluations:
idx = ev.get("index", 0) - 1
if ev.get("relevant") and 0 <= idx < len(feeds):
feed = feeds[idx]
relevant.append({
"url": feed["url"],
"title": feed["title"],
"name": ev.get("name", feed["title"] or domain),
})
logger.info(f"Claude-Bewertung für {domain}: {len(relevant)}/{len(feeds)} relevant")
return relevant
except json.JSONDecodeError:
logger.warning(f"JSON-Parse-Fehler bei Claude-Antwort für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
except Exception as e:
logger.warning(f"Claude-Bewertung Fehler für {domain}: {e}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
def _fallback_all_feeds(domain: str, feeds: list[dict]) -> list[dict]:
"""Fallback: Alle Feeds übernehmen mit Feed-Titel als Name."""
return [
{
"url": f["url"],
"title": f["title"],
"name": f["title"] or domain,
}
for f in feeds
]
async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss_feed") -> list[dict]:
"""Aktive Feeds eines bestimmten Typs mit Metadaten fuer Claude-Selektion (global + org-spezifisch).
source_type: "rss_feed" (Default) oder "podcast_feed" — trennt RSS- und Podcast-Quellen
in getrennten Pipelines, damit der RSS-Heisspfad unveraendert bleibt.
"""
from database import get_db
db = await get_db()
try:
if tenant_id:
cursor = await db.execute(
"SELECT name, url, domain, category, notes, COALESCE(article_count, 0) AS article_count FROM sources "
"WHERE source_type = ? AND status = 'active' "
"AND (tenant_id IS NULL OR tenant_id = ?)",
(source_type, tenant_id),
)
else:
cursor = await db.execute(
"SELECT name, url, domain, category, notes, COALESCE(article_count, 0) AS article_count FROM sources "
"WHERE source_type = ? AND status = 'active'",
(source_type,),
)
return [dict(row) for row in await cursor.fetchall()]
except Exception as e:
logger.error(f"Fehler beim Laden der Feed-Metadaten ({source_type}): {e}")
return []
finally:
await db.close()
async def get_user_excluded_domains(user_id: int) -> list[str]:
"""Laedt die vom User ausgeschlossenen Domains."""
from database import get_db
db = await get_db()
try:
cursor = await db.execute(
"SELECT domain FROM user_excluded_domains WHERE user_id = ?",
(user_id,),
)
return [row[0] for row in await cursor.fetchall()]
except Exception as e:
logger.warning(f"Fehler beim Laden der User-Ausschluesse: {e}")
return []
finally:
await db.close()
async def get_source_rules(tenant_id: int = None) -> dict:
"""Liest Quellen-Konfiguration aus DB (global + org-spezifisch).
Returns:
dict mit:
- excluded_domains: Liste ausgeschlossener Domains
- rss_feeds: Dict mit Kategorien deutsch/international/behoerden
"""
from database import get_db
db = await get_db()
try:
if tenant_id:
cursor = await db.execute(
"SELECT * FROM sources WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)",
(tenant_id,),
)
else:
cursor = await db.execute(
"SELECT * FROM sources WHERE status = 'active'"
)
sources = [dict(row) for row in await cursor.fetchall()]
excluded_domains = []
rss_feeds = {"deutsch": [], "international": [], "behoerden": []}
for source in sources:
if source["source_type"] == "excluded":
excluded_domains.append(source["domain"] or source["name"])
elif source["source_type"] == "rss_feed" and source["url"]:
feed_entry = {"name": source["name"], "url": source["url"]}
cat = source["category"]
if cat == "behoerde":
rss_feeds["behoerden"].append(feed_entry)
elif cat == "international":
rss_feeds["international"].append(feed_entry)
else:
# Alle anderen Kategorien → deutsch
rss_feeds["deutsch"].append(feed_entry)
return {
"excluded_domains": excluded_domains,
"rss_feeds": rss_feeds,
}
except Exception as e:
logger.error(f"Fehler beim Laden der Quellen-Regeln: {e}")
# Fallback auf config.py
from config import RSS_FEEDS, EXCLUDED_SOURCES
return {
"excluded_domains": list(EXCLUDED_SOURCES),
"rss_feeds": dict(RSS_FEEDS),
}
finally:
await db.close()