Täglicher Quellen-Health-Check + Haiku-Vorschläge
- Neue Tabellen: source_health_checks, source_suggestions - source_health.py: Prüft Erreichbarkeit, Feed-Validität, Aktualität, Duplikate - source_suggester.py: KI-gestützte Vorschläge via Claude Haiku - APScheduler Job: Automatischer Check täglich um 04:00 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
@@ -184,6 +184,30 @@ CREATE TABLE IF NOT EXISTS article_locations (
|
|||||||
CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id);
|
CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id);
|
CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS source_health_checks (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
|
||||||
|
check_type TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL,
|
||||||
|
message TEXT,
|
||||||
|
details TEXT,
|
||||||
|
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_source_health_source ON source_health_checks(source_id);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS source_suggestions (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
suggestion_type TEXT NOT NULL,
|
||||||
|
title TEXT NOT NULL,
|
||||||
|
description TEXT,
|
||||||
|
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
|
||||||
|
suggested_data TEXT,
|
||||||
|
priority TEXT DEFAULT 'medium',
|
||||||
|
status TEXT DEFAULT 'pending',
|
||||||
|
reviewed_at TIMESTAMP,
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
|
);
|
||||||
CREATE TABLE IF NOT EXISTS user_excluded_domains (
|
CREATE TABLE IF NOT EXISTS user_excluded_domains (
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
|||||||
18
src/main.py
18
src/main.py
@@ -19,6 +19,8 @@ from config import STATIC_DIR, LOG_DIR, DATA_DIR, TIMEZONE
|
|||||||
from database import init_db, get_db
|
from database import init_db, get_db
|
||||||
from auth import decode_token
|
from auth import decode_token
|
||||||
from agents.orchestrator import orchestrator
|
from agents.orchestrator import orchestrator
|
||||||
|
from services.source_health import run_health_checks, get_health_summary
|
||||||
|
from services.source_suggester import generate_suggestions
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
os.makedirs(LOG_DIR, exist_ok=True)
|
os.makedirs(LOG_DIR, exist_ok=True)
|
||||||
@@ -140,6 +142,21 @@ async def check_auto_refresh():
|
|||||||
await db.close()
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def daily_source_health_check():
|
||||||
|
"""Täglicher Quellen-Health-Check + KI-Vorschläge."""
|
||||||
|
db = await get_db()
|
||||||
|
try:
|
||||||
|
result = await run_health_checks(db)
|
||||||
|
logger.info(f"Täglicher Health-Check: {result['checked']} geprüft, {result['issues']} Probleme")
|
||||||
|
|
||||||
|
suggestion_count = await generate_suggestions(db)
|
||||||
|
logger.info(f"Tägliche Vorschläge: {suggestion_count} neue Vorschläge")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Täglicher Health-Check Fehler: {e}", exc_info=True)
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
async def cleanup_expired():
|
async def cleanup_expired():
|
||||||
"""Bereinigt abgelaufene Lagen basierend auf retention_days."""
|
"""Bereinigt abgelaufene Lagen basierend auf retention_days."""
|
||||||
db = await get_db()
|
db = await get_db()
|
||||||
@@ -218,6 +235,7 @@ async def lifespan(app: FastAPI):
|
|||||||
|
|
||||||
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
|
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
|
||||||
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
|
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
|
||||||
|
scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health")
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
logger.info("OSINT Lagemonitor gestartet")
|
logger.info("OSINT Lagemonitor gestartet")
|
||||||
|
|||||||
282
src/services/source_health.py
Normale Datei
282
src/services/source_health.py
Normale Datei
@@ -0,0 +1,282 @@
|
|||||||
|
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import feedparser
|
||||||
|
import aiosqlite
|
||||||
|
|
||||||
|
logger = logging.getLogger("osint.source_health")
|
||||||
|
|
||||||
|
|
||||||
|
async def run_health_checks(db: aiosqlite.Connection) -> dict:
|
||||||
|
"""Führt alle Health-Checks für aktive Grundquellen durch."""
|
||||||
|
logger.info("Starte Quellen-Health-Check...")
|
||||||
|
|
||||||
|
# Alle aktiven Grundquellen laden
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
|
||||||
|
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
|
||||||
|
)
|
||||||
|
sources = [dict(row) for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben)
|
||||||
|
await db.execute("DELETE FROM source_health_checks")
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
checks_done = 0
|
||||||
|
issues_found = 0
|
||||||
|
|
||||||
|
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
|
||||||
|
sources_with_url = [s for s in sources if s["url"]]
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
timeout=15.0,
|
||||||
|
follow_redirects=True,
|
||||||
|
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
|
||||||
|
) as client:
|
||||||
|
for i in range(0, len(sources_with_url), 5):
|
||||||
|
batch = sources_with_url[i:i + 5]
|
||||||
|
tasks = [_check_source_reachability(client, s) for s in batch]
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
for source, result in zip(batch, results):
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
await _save_check(
|
||||||
|
db, source["id"], "reachability", "error",
|
||||||
|
f"Prüfung fehlgeschlagen: {result}",
|
||||||
|
)
|
||||||
|
issues_found += 1
|
||||||
|
else:
|
||||||
|
for check in result:
|
||||||
|
await _save_check(
|
||||||
|
db, source["id"], check["type"], check["status"],
|
||||||
|
check["message"], check.get("details"),
|
||||||
|
)
|
||||||
|
if check["status"] != "ok":
|
||||||
|
issues_found += 1
|
||||||
|
checks_done += 1
|
||||||
|
|
||||||
|
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
|
||||||
|
for source in sources:
|
||||||
|
if source["source_type"] == "excluded":
|
||||||
|
continue
|
||||||
|
stale_check = _check_stale(source)
|
||||||
|
if stale_check:
|
||||||
|
await _save_check(
|
||||||
|
db, source["id"], stale_check["type"],
|
||||||
|
stale_check["status"], stale_check["message"],
|
||||||
|
)
|
||||||
|
if stale_check["status"] != "ok":
|
||||||
|
issues_found += 1
|
||||||
|
|
||||||
|
# 3. Duplikate erkennen
|
||||||
|
duplicates = _find_duplicates(sources)
|
||||||
|
for dup in duplicates:
|
||||||
|
await _save_check(
|
||||||
|
db, dup["source_id"], "duplicate", "warning",
|
||||||
|
dup["message"], json.dumps(dup.get("details", {})),
|
||||||
|
)
|
||||||
|
issues_found += 1
|
||||||
|
|
||||||
|
await db.commit()
|
||||||
|
logger.info(
|
||||||
|
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
|
||||||
|
f"{issues_found} Probleme gefunden"
|
||||||
|
)
|
||||||
|
return {"checked": checks_done, "issues": issues_found}
|
||||||
|
|
||||||
|
|
||||||
|
async def _check_source_reachability(
|
||||||
|
client: httpx.AsyncClient, source: dict,
|
||||||
|
) -> list[dict]:
|
||||||
|
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
|
||||||
|
checks = []
|
||||||
|
url = source["url"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = await client.get(url)
|
||||||
|
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "error",
|
||||||
|
"message": f"HTTP {resp.status_code} - nicht erreichbar",
|
||||||
|
"details": json.dumps({"status_code": resp.status_code, "url": url}),
|
||||||
|
})
|
||||||
|
return checks
|
||||||
|
|
||||||
|
if resp.status_code >= 300:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "warning",
|
||||||
|
"message": f"HTTP {resp.status_code} - Weiterleitung",
|
||||||
|
"details": json.dumps({
|
||||||
|
"status_code": resp.status_code,
|
||||||
|
"final_url": str(resp.url),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "ok",
|
||||||
|
"message": "Erreichbar",
|
||||||
|
})
|
||||||
|
|
||||||
|
# Feed-Validität nur für RSS-Feeds
|
||||||
|
if source["source_type"] == "rss_feed":
|
||||||
|
text = resp.text[:20000]
|
||||||
|
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
|
||||||
|
checks.append({
|
||||||
|
"type": "feed_validity",
|
||||||
|
"status": "error",
|
||||||
|
"message": "Kein gültiger RSS/Atom-Feed",
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
feed = await asyncio.to_thread(feedparser.parse, text)
|
||||||
|
if feed.get("bozo") and not feed.entries:
|
||||||
|
checks.append({
|
||||||
|
"type": "feed_validity",
|
||||||
|
"status": "error",
|
||||||
|
"message": "Feed fehlerhaft (bozo)",
|
||||||
|
"details": json.dumps({
|
||||||
|
"bozo_exception": str(feed.get("bozo_exception", "")),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
elif not feed.entries:
|
||||||
|
checks.append({
|
||||||
|
"type": "feed_validity",
|
||||||
|
"status": "warning",
|
||||||
|
"message": "Feed erreichbar aber leer",
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
checks.append({
|
||||||
|
"type": "feed_validity",
|
||||||
|
"status": "ok",
|
||||||
|
"message": f"Feed gültig ({len(feed.entries)} Einträge)",
|
||||||
|
})
|
||||||
|
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "error",
|
||||||
|
"message": "Timeout (15s)",
|
||||||
|
})
|
||||||
|
except httpx.ConnectError as e:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "error",
|
||||||
|
"message": f"Verbindung fehlgeschlagen: {e}",
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
checks.append({
|
||||||
|
"type": "reachability",
|
||||||
|
"status": "error",
|
||||||
|
"message": f"{type(e).__name__}: {e}",
|
||||||
|
})
|
||||||
|
|
||||||
|
return checks
|
||||||
|
|
||||||
|
|
||||||
|
def _check_stale(source: dict) -> dict | None:
|
||||||
|
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
|
||||||
|
if source["source_type"] == "excluded":
|
||||||
|
return None
|
||||||
|
|
||||||
|
article_count = source.get("article_count") or 0
|
||||||
|
last_seen = source.get("last_seen_at")
|
||||||
|
|
||||||
|
if article_count == 0:
|
||||||
|
return {
|
||||||
|
"type": "stale",
|
||||||
|
"status": "warning",
|
||||||
|
"message": "Noch nie Artikel geliefert",
|
||||||
|
}
|
||||||
|
|
||||||
|
if last_seen:
|
||||||
|
try:
|
||||||
|
from datetime import datetime
|
||||||
|
last_dt = datetime.fromisoformat(last_seen)
|
||||||
|
now = datetime.now()
|
||||||
|
age_days = (now - last_dt).days
|
||||||
|
if age_days > 30:
|
||||||
|
return {
|
||||||
|
"type": "stale",
|
||||||
|
"status": "warning",
|
||||||
|
"message": f"Letzter Artikel vor {age_days} Tagen",
|
||||||
|
}
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _find_duplicates(sources: list[dict]) -> list[dict]:
|
||||||
|
"""Findet doppelte Quellen (gleiche URL)."""
|
||||||
|
duplicates = []
|
||||||
|
url_map = {}
|
||||||
|
|
||||||
|
for s in sources:
|
||||||
|
if not s["url"]:
|
||||||
|
continue
|
||||||
|
url_norm = s["url"].lower().rstrip("/")
|
||||||
|
if url_norm in url_map:
|
||||||
|
existing = url_map[url_norm]
|
||||||
|
duplicates.append({
|
||||||
|
"source_id": s["id"],
|
||||||
|
"message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})",
|
||||||
|
"details": {"duplicate_of": existing["id"], "type": "url"},
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
url_map[url_norm] = s
|
||||||
|
|
||||||
|
return duplicates
|
||||||
|
|
||||||
|
|
||||||
|
async def _save_check(
|
||||||
|
db: aiosqlite.Connection, source_id: int, check_type: str,
|
||||||
|
status: str, message: str, details: str = None,
|
||||||
|
):
|
||||||
|
"""Speichert ein Health-Check-Ergebnis."""
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO source_health_checks "
|
||||||
|
"(source_id, check_type, status, message, details) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?)",
|
||||||
|
(source_id, check_type, status, message, details),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_health_summary(db: aiosqlite.Connection) -> dict:
|
||||||
|
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
|
||||||
|
cursor = await db.execute("""
|
||||||
|
SELECT
|
||||||
|
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
|
||||||
|
h.check_type, h.status, h.message, h.details, h.checked_at
|
||||||
|
FROM source_health_checks h
|
||||||
|
JOIN sources s ON s.id = h.source_id
|
||||||
|
ORDER BY
|
||||||
|
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
|
||||||
|
s.name
|
||||||
|
""")
|
||||||
|
checks = [dict(row) for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
error_count = sum(1 for c in checks if c["status"] == "error")
|
||||||
|
warning_count = sum(1 for c in checks if c["status"] == "warning")
|
||||||
|
ok_count = sum(1 for c in checks if c["status"] == "ok")
|
||||||
|
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT MAX(checked_at) as last_check FROM source_health_checks"
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
last_check = row["last_check"] if row else None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"last_check": last_check,
|
||||||
|
"total_checks": len(checks),
|
||||||
|
"errors": error_count,
|
||||||
|
"warnings": warning_count,
|
||||||
|
"ok": ok_count,
|
||||||
|
"checks": checks,
|
||||||
|
}
|
||||||
261
src/services/source_suggester.py
Normale Datei
261
src/services/source_suggester.py
Normale Datei
@@ -0,0 +1,261 @@
|
|||||||
|
"""KI-gestützte Quellen-Vorschläge via Haiku."""
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
|
import aiosqlite
|
||||||
|
|
||||||
|
from agents.claude_client import call_claude
|
||||||
|
from config import CLAUDE_MODEL_FAST
|
||||||
|
|
||||||
|
logger = logging.getLogger("osint.source_suggester")
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_suggestions(db: aiosqlite.Connection) -> int:
|
||||||
|
"""Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse."""
|
||||||
|
logger.info("Starte Quellen-Vorschläge via Haiku...")
|
||||||
|
|
||||||
|
# 1. Aktuelle Quellen laden
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id, name, url, domain, source_type, category, status, "
|
||||||
|
"article_count, last_seen_at "
|
||||||
|
"FROM sources WHERE tenant_id IS NULL ORDER BY category, name"
|
||||||
|
)
|
||||||
|
sources = [dict(row) for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
# 2. Health-Check-Probleme laden
|
||||||
|
cursor = await db.execute("""
|
||||||
|
SELECT h.source_id, s.name, s.domain, s.url,
|
||||||
|
h.check_type, h.status, h.message
|
||||||
|
FROM source_health_checks h
|
||||||
|
JOIN sources s ON s.id = h.source_id
|
||||||
|
WHERE h.status IN ('error', 'warning')
|
||||||
|
""")
|
||||||
|
issues = [dict(row) for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
# 3. Alte pending-Vorschläge entfernen (älter als 30 Tage)
|
||||||
|
await db.execute(
|
||||||
|
"DELETE FROM source_suggestions "
|
||||||
|
"WHERE status = 'pending' AND created_at < datetime('now', '-30 days')"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. Quellen-Zusammenfassung für Haiku
|
||||||
|
categories = {}
|
||||||
|
for s in sources:
|
||||||
|
cat = s["category"]
|
||||||
|
if cat not in categories:
|
||||||
|
categories[cat] = []
|
||||||
|
categories[cat].append(s)
|
||||||
|
|
||||||
|
source_summary = ""
|
||||||
|
for cat, cat_sources in sorted(categories.items()):
|
||||||
|
active = [
|
||||||
|
s for s in cat_sources
|
||||||
|
if s["status"] == "active" and s["source_type"] != "excluded"
|
||||||
|
]
|
||||||
|
source_summary += f"\n{cat} ({len(active)} aktiv): "
|
||||||
|
source_summary += ", ".join(s["name"] for s in active[:10])
|
||||||
|
if len(active) > 10:
|
||||||
|
source_summary += f" ... (+{len(active) - 10} weitere)"
|
||||||
|
|
||||||
|
issues_summary = ""
|
||||||
|
if issues:
|
||||||
|
issues_summary = "\n\nProbleme gefunden:\n"
|
||||||
|
for issue in issues[:20]:
|
||||||
|
issues_summary += (
|
||||||
|
f"- {issue['name']} ({issue['domain']}): "
|
||||||
|
f"{issue['check_type']} = {issue['status']} - {issue['message']}\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden.
|
||||||
|
|
||||||
|
Aktuelle Quellensammlung:{source_summary}{issues_summary}
|
||||||
|
|
||||||
|
Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor.
|
||||||
|
|
||||||
|
Beachte:
|
||||||
|
1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor mit der source_id
|
||||||
|
2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor
|
||||||
|
3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen
|
||||||
|
4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind
|
||||||
|
5. Maximal 5 Vorschläge
|
||||||
|
|
||||||
|
Antworte NUR mit einem JSON-Array. Jedes Element:
|
||||||
|
{{
|
||||||
|
"type": "add_source|deactivate_source|fix_url|remove_source",
|
||||||
|
"title": "Kurzer Titel",
|
||||||
|
"description": "Begründung",
|
||||||
|
"priority": "low|medium|high",
|
||||||
|
"source_id": null,
|
||||||
|
"data": {{
|
||||||
|
"name": "Anzeigename",
|
||||||
|
"url": "https://...",
|
||||||
|
"domain": "example.de",
|
||||||
|
"category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige"
|
||||||
|
}}
|
||||||
|
}}
|
||||||
|
|
||||||
|
Nur das JSON-Array, kein anderer Text."""
|
||||||
|
|
||||||
|
try:
|
||||||
|
response, usage = await call_claude(
|
||||||
|
prompt, tools=None, model=CLAUDE_MODEL_FAST,
|
||||||
|
)
|
||||||
|
|
||||||
|
json_match = re.search(r'\[.*\]', response, re.DOTALL)
|
||||||
|
if not json_match:
|
||||||
|
logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
suggestions = json.loads(json_match.group(0))
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
for suggestion in suggestions[:5]:
|
||||||
|
stype = suggestion.get("type", "add_source")
|
||||||
|
title = suggestion.get("title", "")
|
||||||
|
desc = suggestion.get("description", "")
|
||||||
|
priority = suggestion.get("priority", "medium")
|
||||||
|
source_id = suggestion.get("source_id")
|
||||||
|
data = json.dumps(
|
||||||
|
suggestion.get("data", {}), ensure_ascii=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# source_id validieren (muss existieren oder None sein)
|
||||||
|
if source_id is not None:
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id FROM sources WHERE id = ?", (source_id,),
|
||||||
|
)
|
||||||
|
if not await cursor.fetchone():
|
||||||
|
source_id = None
|
||||||
|
|
||||||
|
# Duplikat-Check
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id FROM source_suggestions "
|
||||||
|
"WHERE title = ? AND status = 'pending'",
|
||||||
|
(title,),
|
||||||
|
)
|
||||||
|
if await cursor.fetchone():
|
||||||
|
continue
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO source_suggestions "
|
||||||
|
"(suggestion_type, title, description, source_id, "
|
||||||
|
"suggested_data, priority, status) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, ?, 'pending')",
|
||||||
|
(stype, title, desc, source_id, data, priority),
|
||||||
|
)
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
await db.commit()
|
||||||
|
logger.info(
|
||||||
|
f"Quellen-Vorschläge: {count} neue Vorschläge generiert "
|
||||||
|
f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / "
|
||||||
|
f"${usage.cost_usd:.4f})"
|
||||||
|
)
|
||||||
|
return count
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def apply_suggestion(
|
||||||
|
db: aiosqlite.Connection, suggestion_id: int, accept: bool,
|
||||||
|
) -> dict:
|
||||||
|
"""Wendet einen Vorschlag an oder lehnt ihn ab."""
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,),
|
||||||
|
)
|
||||||
|
suggestion = await cursor.fetchone()
|
||||||
|
if not suggestion:
|
||||||
|
raise ValueError("Vorschlag nicht gefunden")
|
||||||
|
|
||||||
|
suggestion = dict(suggestion)
|
||||||
|
|
||||||
|
if suggestion["status"] != "pending":
|
||||||
|
raise ValueError(f"Vorschlag bereits {suggestion['status']}")
|
||||||
|
|
||||||
|
new_status = "accepted" if accept else "rejected"
|
||||||
|
result = {"status": new_status, "action": None}
|
||||||
|
|
||||||
|
if accept:
|
||||||
|
stype = suggestion["suggestion_type"]
|
||||||
|
data = (
|
||||||
|
json.loads(suggestion["suggested_data"])
|
||||||
|
if suggestion["suggested_data"]
|
||||||
|
else {}
|
||||||
|
)
|
||||||
|
|
||||||
|
if stype == "add_source":
|
||||||
|
name = data.get("name", "Unbenannt")
|
||||||
|
url = data.get("url")
|
||||||
|
domain = data.get("domain", "")
|
||||||
|
category = data.get("category", "sonstige")
|
||||||
|
source_type = "rss_feed" if url and any(
|
||||||
|
x in (url or "").lower()
|
||||||
|
for x in ("rss", "feed", "xml", "atom")
|
||||||
|
) else "web_source"
|
||||||
|
|
||||||
|
if url:
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
|
||||||
|
(url,),
|
||||||
|
)
|
||||||
|
if await cursor.fetchone():
|
||||||
|
result["action"] = "übersprungen (URL bereits vorhanden)"
|
||||||
|
new_status = "rejected"
|
||||||
|
else:
|
||||||
|
await db.execute(
|
||||||
|
"INSERT INTO sources "
|
||||||
|
"(name, url, domain, source_type, category, status, "
|
||||||
|
"added_by, tenant_id) "
|
||||||
|
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
|
||||||
|
(name, url, domain, source_type, category),
|
||||||
|
)
|
||||||
|
result["action"] = f"Quelle '{name}' angelegt"
|
||||||
|
else:
|
||||||
|
result["action"] = "übersprungen (keine URL)"
|
||||||
|
new_status = "rejected"
|
||||||
|
|
||||||
|
elif stype == "deactivate_source":
|
||||||
|
source_id = suggestion["source_id"]
|
||||||
|
if source_id:
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET status = 'inactive' WHERE id = ?",
|
||||||
|
(source_id,),
|
||||||
|
)
|
||||||
|
result["action"] = "Quelle deaktiviert"
|
||||||
|
else:
|
||||||
|
result["action"] = "übersprungen (keine source_id)"
|
||||||
|
|
||||||
|
elif stype == "remove_source":
|
||||||
|
source_id = suggestion["source_id"]
|
||||||
|
if source_id:
|
||||||
|
await db.execute(
|
||||||
|
"DELETE FROM sources WHERE id = ?", (source_id,),
|
||||||
|
)
|
||||||
|
result["action"] = "Quelle gelöscht"
|
||||||
|
else:
|
||||||
|
result["action"] = "übersprungen (keine source_id)"
|
||||||
|
|
||||||
|
elif stype == "fix_url":
|
||||||
|
source_id = suggestion["source_id"]
|
||||||
|
new_url = data.get("url")
|
||||||
|
if source_id and new_url:
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET url = ? WHERE id = ?",
|
||||||
|
(new_url, source_id),
|
||||||
|
)
|
||||||
|
result["action"] = f"URL aktualisiert auf {new_url}"
|
||||||
|
else:
|
||||||
|
result["action"] = "übersprungen (keine source_id oder URL)"
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP "
|
||||||
|
"WHERE id = ?",
|
||||||
|
(new_status, suggestion_id),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
result["status"] = new_status
|
||||||
|
return result
|
||||||
In neuem Issue referenzieren
Einen Benutzer sperren