Mojibake fix: source_suggester.py + source_health.py via ftfy

Beide Files hatten Doppel-Encoded UTF-8 in Docstrings, Kommentaren und
Prompt-Strings (z.B. "prüft" statt "prüft", "Vorschläge" statt
"Vorschläge"). ftfy hat das automatisch repariert.

Hauptauswirkungen:
- Logs sind jetzt mit echten Umlauten lesbar
- Claude/Haiku-Prompts in source_suggester.py (Quellen-Vorschlaege via KI)
  bekommen jetzt korrekte deutsche Umlaute - sollte bessere Antworten geben

Daneben hat ftfy line-endings normalisiert, daher der grosse Diff in
source_health.py - inhaltlich nur Mojibake-Reparatur.

Verifiziert mit:
  grep -cE "ä|ö|ü|ß|Ä|Ö|Ü" src/services/*.py
  -> 0 Treffer
Dieser Commit ist enthalten in:
Claude Code
2026-05-09 03:35:13 +00:00
Ursprung 1e9cca2555
Commit d71daee581
2 geänderte Dateien mit 299 neuen und 299 gelöschten Zeilen

Datei anzeigen

@@ -1,282 +1,282 @@
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
import asyncio
import logging
import json
from urllib.parse import urlparse
import httpx
import feedparser
import aiosqlite
logger = logging.getLogger("osint.source_health")
async def run_health_checks(db: aiosqlite.Connection) -> dict:
"""Führt alle Health-Checks für aktive Grundquellen durch."""
logger.info("Starte Quellen-Health-Check...")
# Alle aktiven Grundquellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
)
sources = [dict(row) for row in await cursor.fetchall()]
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben)
await db.execute("DELETE FROM source_health_checks")
await db.commit()
checks_done = 0
issues_found = 0
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
sources_with_url = [s for s in sources if s["url"]]
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
for i in range(0, len(sources_with_url), 5):
batch = sources_with_url[i:i + 5]
tasks = [_check_source_reachability(client, s) for s in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for source, result in zip(batch, results):
if isinstance(result, Exception):
await _save_check(
db, source["id"], "reachability", "error",
f"Prüfung fehlgeschlagen: {result}",
)
issues_found += 1
else:
for check in result:
await _save_check(
db, source["id"], check["type"], check["status"],
check["message"], check.get("details"),
)
if check["status"] != "ok":
issues_found += 1
checks_done += 1
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
for source in sources:
if source["source_type"] in ("excluded", "web_source"):
continue
stale_check = _check_stale(source)
if stale_check:
await _save_check(
db, source["id"], stale_check["type"],
stale_check["status"], stale_check["message"],
)
if stale_check["status"] != "ok":
issues_found += 1
# 3. Duplikate erkennen
duplicates = _find_duplicates(sources)
for dup in duplicates:
await _save_check(
db, dup["source_id"], "duplicate", "warning",
dup["message"], json.dumps(dup.get("details", {})),
)
issues_found += 1
await db.commit()
logger.info(
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
f"{issues_found} Probleme gefunden"
)
return {"checked": checks_done, "issues": issues_found}
async def _check_source_reachability(
client: httpx.AsyncClient, source: dict,
) -> list[dict]:
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
checks = []
url = source["url"]
try:
resp = await client.get(url)
if resp.status_code >= 400:
checks.append({
"type": "reachability",
"status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar",
"details": json.dumps({"status_code": resp.status_code, "url": url}),
})
return checks
if resp.status_code >= 300:
checks.append({
"type": "reachability",
"status": "warning",
"message": f"HTTP {resp.status_code} - Weiterleitung",
"details": json.dumps({
"status_code": resp.status_code,
"final_url": str(resp.url),
}),
})
else:
checks.append({
"type": "reachability",
"status": "ok",
"message": "Erreichbar",
})
# Feed-Validität nur für RSS-Feeds
if source["source_type"] == "rss_feed":
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Kein gültiger RSS/Atom-Feed",
})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Feed fehlerhaft (bozo)",
"details": json.dumps({
"bozo_exception": str(feed.get("bozo_exception", "")),
}),
})
elif not feed.entries:
checks.append({
"type": "feed_validity",
"status": "warning",
"message": "Feed erreichbar aber leer",
})
else:
checks.append({
"type": "feed_validity",
"status": "ok",
"message": f"Feed gültig ({len(feed.entries)} Einträge)",
})
except httpx.TimeoutException:
checks.append({
"type": "reachability",
"status": "error",
"message": "Timeout (15s)",
})
except httpx.ConnectError as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"Verbindung fehlgeschlagen: {e}",
})
except Exception as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"{type(e).__name__}: {e}",
})
return checks
def _check_stale(source: dict) -> dict | None:
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
if source["source_type"] == "excluded":
return None
article_count = source.get("article_count") or 0
last_seen = source.get("last_seen_at")
if article_count == 0:
return {
"type": "stale",
"status": "warning",
"message": "Noch nie Artikel geliefert",
}
if last_seen:
try:
from datetime import datetime
last_dt = datetime.fromisoformat(last_seen)
now = datetime.now()
age_days = (now - last_dt).days
if age_days > 30:
return {
"type": "stale",
"status": "warning",
"message": f"Letzter Artikel vor {age_days} Tagen",
}
except (ValueError, TypeError):
pass
return None
def _find_duplicates(sources: list[dict]) -> list[dict]:
"""Findet doppelte Quellen (gleiche URL)."""
duplicates = []
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
duplicates.append({
"source_id": s["id"],
"message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})",
"details": {"duplicate_of": existing["id"], "type": "url"},
})
else:
url_map[url_norm] = s
return duplicates
async def _save_check(
db: aiosqlite.Connection, source_id: int, check_type: str,
status: str, message: str, details: str = None,
):
"""Speichert ein Health-Check-Ergebnis."""
await db.execute(
"INSERT INTO source_health_checks "
"(source_id, check_type, status, message, details) "
"VALUES (?, ?, ?, ?, ?)",
(source_id, check_type, status, message, details),
)
async def get_health_summary(db: aiosqlite.Connection) -> dict:
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute("""
SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
h.check_type, h.status, h.message, h.details, h.checked_at
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
ORDER BY
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
s.name
""")
checks = [dict(row) for row in await cursor.fetchall()]
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute(
"SELECT MAX(checked_at) as last_check FROM source_health_checks"
)
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
import asyncio
import logging
import json
from urllib.parse import urlparse
import httpx
import feedparser
import aiosqlite
logger = logging.getLogger("osint.source_health")
async def run_health_checks(db: aiosqlite.Connection) -> dict:
"""Führt alle Health-Checks für aktive Grundquellen durch."""
logger.info("Starte Quellen-Health-Check...")
# Alle aktiven Grundquellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
)
sources = [dict(row) for row in await cursor.fetchall()]
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben)
await db.execute("DELETE FROM source_health_checks")
await db.commit()
checks_done = 0
issues_found = 0
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
sources_with_url = [s for s in sources if s["url"]]
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
for i in range(0, len(sources_with_url), 5):
batch = sources_with_url[i:i + 5]
tasks = [_check_source_reachability(client, s) for s in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for source, result in zip(batch, results):
if isinstance(result, Exception):
await _save_check(
db, source["id"], "reachability", "error",
f"Prüfung fehlgeschlagen: {result}",
)
issues_found += 1
else:
for check in result:
await _save_check(
db, source["id"], check["type"], check["status"],
check["message"], check.get("details"),
)
if check["status"] != "ok":
issues_found += 1
checks_done += 1
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
for source in sources:
if source["source_type"] in ("excluded", "web_source"):
continue
stale_check = _check_stale(source)
if stale_check:
await _save_check(
db, source["id"], stale_check["type"],
stale_check["status"], stale_check["message"],
)
if stale_check["status"] != "ok":
issues_found += 1
# 3. Duplikate erkennen
duplicates = _find_duplicates(sources)
for dup in duplicates:
await _save_check(
db, dup["source_id"], "duplicate", "warning",
dup["message"], json.dumps(dup.get("details", {})),
)
issues_found += 1
await db.commit()
logger.info(
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
f"{issues_found} Probleme gefunden"
)
return {"checked": checks_done, "issues": issues_found}
async def _check_source_reachability(
client: httpx.AsyncClient, source: dict,
) -> list[dict]:
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
checks = []
url = source["url"]
try:
resp = await client.get(url)
if resp.status_code >= 400:
checks.append({
"type": "reachability",
"status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar",
"details": json.dumps({"status_code": resp.status_code, "url": url}),
})
return checks
if resp.status_code >= 300:
checks.append({
"type": "reachability",
"status": "warning",
"message": f"HTTP {resp.status_code} - Weiterleitung",
"details": json.dumps({
"status_code": resp.status_code,
"final_url": str(resp.url),
}),
})
else:
checks.append({
"type": "reachability",
"status": "ok",
"message": "Erreichbar",
})
# Feed-Validität nur für RSS-Feeds
if source["source_type"] == "rss_feed":
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Kein gültiger RSS/Atom-Feed",
})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({
"type": "feed_validity",
"status": "error",
"message": "Feed fehlerhaft (bozo)",
"details": json.dumps({
"bozo_exception": str(feed.get("bozo_exception", "")),
}),
})
elif not feed.entries:
checks.append({
"type": "feed_validity",
"status": "warning",
"message": "Feed erreichbar aber leer",
})
else:
checks.append({
"type": "feed_validity",
"status": "ok",
"message": f"Feed gültig ({len(feed.entries)} Einträge)",
})
except httpx.TimeoutException:
checks.append({
"type": "reachability",
"status": "error",
"message": "Timeout (15s)",
})
except httpx.ConnectError as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"Verbindung fehlgeschlagen: {e}",
})
except Exception as e:
checks.append({
"type": "reachability",
"status": "error",
"message": f"{type(e).__name__}: {e}",
})
return checks
def _check_stale(source: dict) -> dict | None:
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
if source["source_type"] == "excluded":
return None
article_count = source.get("article_count") or 0
last_seen = source.get("last_seen_at")
if article_count == 0:
return {
"type": "stale",
"status": "warning",
"message": "Noch nie Artikel geliefert",
}
if last_seen:
try:
from datetime import datetime
last_dt = datetime.fromisoformat(last_seen)
now = datetime.now()
age_days = (now - last_dt).days
if age_days > 30:
return {
"type": "stale",
"status": "warning",
"message": f"Letzter Artikel vor {age_days} Tagen",
}
except (ValueError, TypeError):
pass
return None
def _find_duplicates(sources: list[dict]) -> list[dict]:
"""Findet doppelte Quellen (gleiche URL)."""
duplicates = []
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
duplicates.append({
"source_id": s["id"],
"message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})",
"details": {"duplicate_of": existing["id"], "type": "url"},
})
else:
url_map[url_norm] = s
return duplicates
async def _save_check(
db: aiosqlite.Connection, source_id: int, check_type: str,
status: str, message: str, details: str = None,
):
"""Speichert ein Health-Check-Ergebnis."""
await db.execute(
"INSERT INTO source_health_checks "
"(source_id, check_type, status, message, details) "
"VALUES (?, ?, ?, ?, ?)",
(source_id, check_type, status, message, details),
)
async def get_health_summary(db: aiosqlite.Connection) -> dict:
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute("""
SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
h.check_type, h.status, h.message, h.details, h.checked_at
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
ORDER BY
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
s.name
""")
checks = [dict(row) for row in await cursor.fetchall()]
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute(
"SELECT MAX(checked_at) as last_check FROM source_health_checks"
)
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}