Dateien
AegisSight-Monitor-Verwaltung/src/routers/sources.py
claude-dev 0da66fb585 Podcast-Feed-Typ im Verwaltungsportal
Passt Verwaltung an die Podcast-Integration im Monitor an (Commit 5127e0a):

Backend (src/routers/sources.py):
- Pydantic-Pattern von GlobalSourceCreate + GlobalSourceUpdate um
  podcast_feed erweitert
- Health-Check Feed-Validierung greift jetzt auch fuer podcast_feed
  (Podcast-Feeds sind technisch RSS/Atom)

Frontend:
- src/static/js/sources.js: TYPE_LABELS um podcast_feed ("Podcast-Feed")
  ergaenzt
- src/static/dashboard.html: Neue <option value=podcast_feed> in Filter-
  und Anlage-Dropdown

Ohne diese Anpassung waere das Anlegen von Podcast-Quellen ueber das
Verwaltungsportal nicht moeglich (422 Unprocessable Entity vom
Pydantic-Validator).
2026-04-18 12:22:12 +00:00

826 Zeilen
31 KiB
Python
Originalformat Blame Verlauf

Diese Datei enthält mehrdeutige Unicode-Zeichen
Diese Datei enthält Unicode-Zeichen, die mit anderen Zeichen verwechselt werden können. Wenn du glaubst, dass das absichtlich so ist, kannst du diese Warnung ignorieren. Benutze den „Escape“-Button, um versteckte Zeichen anzuzeigen.
import os
"""Grundquellen-Verwaltung und Kundenquellen-ÃÆÃ…“bersicht."""
import sys
import logging
# Monitor-Source-Rules verfÃÆÃ¼gbar machen
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
from auth import get_current_admin
from database import db_dependency
import aiosqlite
sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src'))
from source_rules import (
discover_source,
discover_all_feeds,
evaluate_feeds_with_claude,
_extract_domain,
_detect_category,
domain_to_display_name,
)
logger = logging.getLogger("verwaltung.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
class GlobalSourceCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: str = Field(default="sonstige")
status: str = Field(default="active", pattern="^(active|inactive)$")
notes: Optional[str] = None
class GlobalSourceUpdate(BaseModel):
name: Optional[str] = Field(default=None, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: Optional[str] = None
status: Optional[str] = Field(default=None, pattern="^(active|inactive)$")
notes: Optional[str] = None
@router.get("/global")
async def list_global_sources(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Grundquellen auflisten (tenant_id IS NULL)."""
cursor = await db.execute(
"SELECT * FROM sources WHERE tenant_id IS NULL ORDER BY category, source_type, name"
)
return [dict(row) for row in await cursor.fetchall()]
@router.post("/global", status_code=201)
async def create_global_source(
data: GlobalSourceCreate,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Neue Grundquelle anlegen."""
if data.url:
cursor = await db.execute(
"SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL",
(data.url,),
)
existing = await cursor.fetchone()
if existing:
raise HTTPException(
status_code=409,
detail=f"URL bereits vorhanden: {existing['name']}",
)
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, 'system', NULL)""",
(data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes),
)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,))
return dict(await cursor.fetchone())
@router.put("/global/{source_id}")
async def update_global_source(
source_id: int,
data: GlobalSourceUpdate,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Grundquelle bearbeiten."""
cursor = await db.execute(
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
updates[field] = value
if not updates:
return dict(row)
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [source_id]
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
return dict(await cursor.fetchone())
@router.delete("/global/{source_id}", status_code=204)
async def delete_global_source(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Grundquelle loeschen."""
cursor = await db.execute(
"SELECT id FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
)
if not await cursor.fetchone():
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
await db.commit()
@router.get("/tenant")
async def list_tenant_sources(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle tenant-spezifischen Quellen mit Org-Name auflisten."""
cursor = await db.execute("""
SELECT s.*, o.name as org_name
FROM sources s
LEFT JOIN organizations o ON o.id = s.tenant_id
WHERE s.tenant_id IS NOT NULL
ORDER BY o.name, s.category, s.name
""")
return [dict(row) for row in await cursor.fetchall()]
@router.post("/tenant/{source_id}/promote")
async def promote_to_global(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Tenant-Quelle zur Grundquelle befoerdern."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
if row["tenant_id"] is None:
raise HTTPException(status_code=400, detail="Bereits eine Grundquelle")
if row["url"]:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
(row["url"],),
)
if await cursor.fetchone():
raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden")
await db.execute(
"UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?",
(source_id,),
)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
return dict(await cursor.fetchone())
@router.post("/discover")
async def discover_source_endpoint(
url: str,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen.
Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt
die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen.
"""
try:
multi = await discover_all_feeds(url)
except Exception as e:
logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen")
domain = multi["domain"]
category = multi["category"]
feeds = multi.get("feeds", [])
# Fallback auf Einzel-Discovery wenn keine Feeds gefunden
if not feeds:
try:
single = await discover_source(url)
if single.get("rss_url"):
feeds = [{"name": single["name"], "url": single["rss_url"]}]
domain = single.get("domain", domain)
category = single.get("category", category)
except Exception:
pass
if not feeds:
return {
"domain": domain,
"category": category,
"feeds": [],
"existing": [],
"message": "Keine RSS-Feeds gefunden",
}
# Mit Claude bewerten
try:
relevant_feeds = await evaluate_feeds_with_claude(domain, feeds)
except Exception:
relevant_feeds = feeds[:3]
# Bereits vorhandene Grundquellen pruefen
cursor = await db.execute(
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
)
existing_urls = {row["url"] for row in await cursor.fetchall()}
result_feeds = []
existing = []
for feed in relevant_feeds:
info = {
"name": feed.get("name", domain_to_display_name(domain)),
"url": feed["url"],
"domain": domain,
"category": category,
}
if feed["url"] in existing_urls:
existing.append(info)
else:
result_feeds.append(info)
return {
"domain": domain,
"category": category,
"feeds": result_feeds,
"existing": existing,
}
@router.post("/discover/add")
async def add_discovered_sources(
feeds: list[dict],
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Erkannte Feeds als Grundquellen anlegen.
Erwartet eine Liste von {name, url, domain, category}.
Ueberspringt bereits vorhandene URLs.
"""
cursor = await db.execute(
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
)
existing_urls = {row["url"] for row in await cursor.fetchall()}
added = 0
skipped = 0
for feed in feeds:
if not feed.get("url"):
continue
if feed["url"] in existing_urls:
skipped += 1
continue
domain = feed.get("domain", "")
await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""",
(feed["name"], feed["url"], domain, feed.get("category", "sonstige")),
)
existing_urls.add(feed["url"])
added += 1
# Web-Source fÃÆÃ¼r die Domain anlegen wenn noch nicht vorhanden
if feeds and feeds[0].get("domain"):
domain = feeds[0]["domain"]
cursor = await db.execute(
"SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL",
(domain.lower(),),
)
if not await cursor.fetchone():
await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""",
(domain_to_display_name(domain), f"https://{domain}", domain,
feeds[0].get("category", "sonstige")),
)
added += 1
await db.commit()
return {"added": added, "skipped": skipped}
# --- Health-Check & VorschlÃÆÃ¤ge ---
@router.get("/health")
async def get_health(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check-Ergebnisse abrufen."""
# PrÃÆÃ¼fen ob Tabelle existiert
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'"
)
if not await cursor.fetchone():
return {"last_check": None, "total_checks": 0, "errors": 0, "warnings": 0, "ok": 0, "checks": []}
cursor = await db.execute("""
SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
h.check_type, h.status, h.message, h.details, h.checked_at
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
ORDER BY
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
s.name
""")
checks = [dict(row) for row in await cursor.fetchall()]
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks")
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}
@router.get("/suggestions")
async def get_suggestions(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle VorschlÃÆÃ¤ge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'"
)
if not await cursor.fetchone():
return []
cursor = await db.execute("""
SELECT * FROM source_suggestions
ORDER BY
CASE status WHEN 'pending' THEN 0 ELSE 1 END,
created_at DESC
LIMIT 50
""")
return [dict(row) for row in await cursor.fetchall()]
class SuggestionAction(BaseModel):
accept: bool
@router.put("/suggestions/{suggestion_id}")
async def update_suggestion(
suggestion_id: int,
action: SuggestionAction,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Vorschlag annehmen oder ablehnen."""
import json as _json
cursor = await db.execute(
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,)
)
suggestion = await cursor.fetchone()
if not suggestion:
raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden")
suggestion = dict(suggestion)
if suggestion["status"] != "pending":
raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}")
new_status = "accepted" if action.accept else "rejected"
result_action = None
if action.accept:
stype = suggestion["suggestion_type"]
data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {}
if stype == "add_source":
name = data.get("name", "Unbenannt")
url = data.get("url")
domain = data.get("domain", "")
category = data.get("category", "sonstige")
source_type = "rss_feed" if url and any(
x in (url or "").lower() for x in ("rss", "feed", "xml", "atom")
) else "web_source"
if url:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,)
)
if await cursor.fetchone():
result_action = "ÃÆÃ¼bersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
"INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) "
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
(name, url, domain, source_type, category),
)
result_action = f"Quelle '{name}' angelegt"
else:
result_action = "ÃÆÃ¼bersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,))
result_action = "Quelle deaktiviert"
elif stype == "remove_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
result_action = "Quelle gel̮̦scht"
elif stype == "fix_url":
source_id = suggestion["source_id"]
new_url = data.get("url")
if source_id and new_url:
await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id))
result_action = f"URL aktualisiert"
# Auto-Reject: Wenn fix_url oder add_source akzeptiert wird,
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
if stype in ("fix_url", "add_source") and suggestion.get("source_id"):
await db.execute(
"UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP "
"WHERE source_id = ? AND suggestion_type = 'deactivate_source' AND status = 'pending' AND id != ?",
(suggestion["source_id"], suggestion_id),
)
await db.execute(
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_status, suggestion_id),
)
await db.commit()
return {"status": new_status, "action": result_action}
@router.post("/health/run")
async def run_health_check_now(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check manuell starten."""
# Tabellen sicherstellen
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
check_type TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
details TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await db.commit()
# source_health und source_suggester importieren
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from services.source_health import run_health_checks
from services.source_suggester import generate_suggestions
result = await run_health_checks(db)
suggestion_count = await generate_suggestions(db)
return {
"checked": result["checked"],
"issues": result["issues"],
"suggestions": suggestion_count,
}
@router.post("/health/run-stream")
async def run_health_check_stream(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check mit Fortschrittsanzeige (SSE-Stream)."""
import json as _json
import asyncio
from urllib.parse import urlparse
# Tabellen sicherstellen
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
check_type TEXT NOT NULL, status TEXT NOT NULL,
message TEXT, details TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL, title TEXT NOT NULL,
description TEXT, source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT, priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending', reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await db.commit()
# Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
)
sources = [dict(row) for row in await cursor.fetchall()]
sources_with_url = [s for s in sources if s["url"]]
total = len(sources_with_url)
async def generate():
import httpx
import feedparser
# Phase 1: Erreichbarkeit
yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n"
await db.execute("DELETE FROM source_health_checks")
await db.commit()
issues_found = 0
checked = 0
async with httpx.AsyncClient(
timeout=15.0, follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
for source in sources_with_url:
try:
checks = []
try:
resp = await client.get(source["url"])
if resp.status_code >= 400:
checks.append({"type": "reachability", "status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar"})
else:
checks.append({"type": "reachability", "status": "ok", "message": "Erreichbar"})
if source["source_type"] in ("rss_feed", "podcast_feed"):
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({"type": "feed_validity", "status": "error",
"message": "Kein RSS/Atom-Feed"})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({"type": "feed_validity", "status": "error",
"message": "Feed fehlerhaft"})
elif not feed.entries:
checks.append({"type": "feed_validity", "status": "warning",
"message": "Feed leer"})
else:
checks.append({"type": "feed_validity", "status": "ok",
"message": f"Feed OK ({len(feed.entries)} Eintr.)"})
except httpx.TimeoutException:
checks.append({"type": "reachability", "status": "error", "message": "Timeout (15s)"})
except httpx.ConnectError as e:
checks.append({"type": "reachability", "status": "error", "message": f"Verbindung fehlgeschlagen"})
except Exception as e:
checks.append({"type": "reachability", "status": "error", "message": f"{type(e).__name__}"})
for c in checks:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, ?, ?, ?)",
(source["id"], c["type"], c["status"], c["message"]))
if c["status"] != "ok":
issues_found += 1
except Exception:
pass
checked += 1
status_icon = "ok"
if any(c["status"] == "error" for c in checks):
status_icon = "error"
elif any(c["status"] == "warning" for c in checks):
status_icon = "warning"
yield f"data: {_json.dumps({'phase': 'check', 'checked': checked, 'total': total, 'current': source['name'], 'status': status_icon})}\n\n"
# Stale + Duplikate (schnell, kein Fortschritt noetig)
for source in sources:
if source["source_type"] in ("excluded", "web_source"):
continue
article_count = source.get("article_count") or 0
if article_count == 0:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', 'Noch nie Artikel geliefert')",
(source["id"],))
issues_found += 1
elif source.get("last_seen_at"):
try:
from datetime import datetime
last_dt = datetime.fromisoformat(source["last_seen_at"])
age = (datetime.now() - last_dt).days
if age > 30:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', ?)",
(source["id"], f"Letzter Artikel vor {age} Tagen"))
issues_found += 1
except (ValueError, TypeError):
pass
# Duplikate
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'duplicate', 'warning', ?)",
(s["id"], f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})"))
issues_found += 1
else:
url_map[url_norm] = s
await db.commit()
# Phase 2: Vorschlaege
yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n"
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from services.source_suggester import generate_suggestions
suggestion_count = await generate_suggestions(db)
# Fertig
yield f"data: {_json.dumps({'phase': 'done', 'checked': checked, 'total': total, 'issues': issues_found, 'suggestions': suggestion_count})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.post("/health/search-fix/{source_id}")
async def search_fix_for_source(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Sonnet mit WebSearch nach L̮̦sung fÃÆÃ¼r eine kaputte Quelle suchen lassen."""
import json as _json
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, category FROM sources WHERE id = ?",
(source_id,),
)
source = await cursor.fetchone()
if not source:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
source = dict(source)
# Health-Check-Probleme fÃÆÃ¼r diese Quelle laden
cursor = await db.execute(
"SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?",
(source_id,),
)
issues = [dict(row) for row in await cursor.fetchall()]
issues_text = "\n".join(f"- {i['check_type']}: {i['status']} - {i['message']}" for i in issues)
prompt = f"""Du bist ein OSINT-Analyst. Folgende Quelle ist nicht mehr erreichbar:
Name: {source['name']}
URL: {source['url'] or 'keine'}
Domain: {source['domain'] or 'unbekannt'}
Typ: {source['source_type']}
Kategorie: {source['category']}
Probleme:
{issues_text}
Aufgabe: Suche im Internet nach funktionierenden Alternativen fÃÆÃ¼r diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsÃÆÃ¤chlich funktionieren
- PrÃÆÃ¼fe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine L̮̦sung oder ist die Quelle nur noch per WebSearch erreichbar?
Regeln:
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
Antworte NUR mit einem JSON-Objekt:
{{
"fixable": true/false,
"solutions": [
{{
"type": "replace_url|add_feed|deactivate",
"name": "Anzeigename",
"url": "https://...",
"description": "Kurze BegrÃÆÃ¼ndung"
}}
],
"summary": "Zusammenfassung in 1-2 SÃÆÃ¤tzen"
}}
Nur das JSON, kein anderer Text."""
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from agents.claude_client import call_claude
try:
response, usage = await call_claude(prompt, tools="WebSearch,WebFetch")
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
result = _json.loads(json_match.group(0))
else:
result = {"fixable": False, "solutions": [], "summary": response[:500]}
# L̮̦sungen als VorschlÃÆÃ¤ge speichern
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
for sol in result.get("solutions", []):
sol_type = sol.get("type", "add_feed")
suggestion_type = {
"replace_url": "fix_url",
"add_feed": "add_source",
"deactivate": "deactivate_source",
}.get(sol_type, "add_source")
title = f"{source['name']}: {sol.get('description', sol_type)[:120]}"
# Duplikat-Check: gleicher Typ + gleiche Quelle bereits pending?
cursor = await db.execute(
"SELECT id FROM source_suggestions WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
(suggestion_type, source_id),
)
if await cursor.fetchone():
continue
data = _json.dumps({
"name": sol.get("name", source["name"]),
"url": sol.get("url", ""),
"domain": source["domain"] or "",
"category": source["category"],
}, ensure_ascii=False)
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, suggested_data, priority, status) "
"VALUES (?, ?, ?, ?, ?, 'high', 'pending')",
(suggestion_type, title, sol.get("description", ""), source_id, data),
)
await db.commit()
result["cost_usd"] = usage.cost_usd
result["tokens"] = {"input": usage.input_tokens, "output": usage.output_tokens}
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}")