From eaceb5ef6b2821ce7c4e41c4205f2abb03980bf0 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sun, 8 Mar 2026 17:29:19 +0100 Subject: [PATCH] Vorschlagssystem: Sonnet-Prompt, Auto-Reject, Titellimit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sonnet: max 3 Lösungen, echte Umlaute erzwingen - Auto-Reject: deactivate_source wird abgelehnt wenn fix_url akzeptiert - Titel-Limit von 80 auf 120 Zeichen erhöht Co-Authored-By: Claude Opus 4.6 --- src/routers/sources.py | 1311 ++++++++++++++++++++-------------------- 1 file changed, 662 insertions(+), 649 deletions(-) diff --git a/src/routers/sources.py b/src/routers/sources.py index fe7e73e..b2e579d 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,649 +1,662 @@ -import os -"""Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" -import sys -import logging - -# Monitor-Source-Rules verfügbar machen -sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - -from fastapi import APIRouter, Depends, HTTPException, status -from pydantic import BaseModel, Field -from typing import Optional -from auth import get_current_admin -from database import db_dependency -import aiosqlite - -sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src')) - -from source_rules import ( - discover_source, - discover_all_feeds, - evaluate_feeds_with_claude, - _extract_domain, - _detect_category, - domain_to_display_name, -) - -logger = logging.getLogger("verwaltung.sources") - -router = APIRouter(prefix="/api/sources", tags=["sources"]) - - -class GlobalSourceCreate(BaseModel): - name: str = Field(min_length=1, max_length=200) - url: Optional[str] = None - domain: Optional[str] = None - source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded)$") - category: str = Field(default="sonstige") - status: str = Field(default="active", pattern="^(active|inactive)$") - notes: Optional[str] = None - - -class GlobalSourceUpdate(BaseModel): - name: Optional[str] = Field(default=None, max_length=200) - url: Optional[str] = None - domain: Optional[str] = None - source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded)$") - category: Optional[str] = None - status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") - notes: Optional[str] = None - - -@router.get("/global") -async def list_global_sources( - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Alle Grundquellen auflisten (tenant_id IS NULL).""" - cursor = await db.execute( - "SELECT * FROM sources WHERE tenant_id IS NULL ORDER BY category, source_type, name" - ) - return [dict(row) for row in await cursor.fetchall()] - - -@router.post("/global", status_code=201) -async def create_global_source( - data: GlobalSourceCreate, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Neue Grundquelle anlegen.""" - if data.url: - cursor = await db.execute( - "SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL", - (data.url,), - ) - existing = await cursor.fetchone() - if existing: - raise HTTPException( - status_code=409, - detail=f"URL bereits vorhanden: {existing['name']}", - ) - - cursor = await db.execute( - """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, 'system', NULL)""", - (data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes), - ) - await db.commit() - - cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) - return dict(await cursor.fetchone()) - - -@router.put("/global/{source_id}") -async def update_global_source( - source_id: int, - data: GlobalSourceUpdate, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Grundquelle bearbeiten.""" - cursor = await db.execute( - "SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) - ) - row = await cursor.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") - - updates = {} - for field, value in data.model_dump(exclude_none=True).items(): - updates[field] = value - - if not updates: - return dict(row) - - set_clause = ", ".join(f"{k} = ?" for k in updates) - values = list(updates.values()) + [source_id] - await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) - await db.commit() - - cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) - return dict(await cursor.fetchone()) - - -@router.delete("/global/{source_id}", status_code=204) -async def delete_global_source( - source_id: int, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Grundquelle loeschen.""" - cursor = await db.execute( - "SELECT id FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) - ) - if not await cursor.fetchone(): - raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") - - await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) - await db.commit() - - -@router.get("/tenant") -async def list_tenant_sources( - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Alle tenant-spezifischen Quellen mit Org-Name auflisten.""" - cursor = await db.execute(""" - SELECT s.*, o.name as org_name - FROM sources s - LEFT JOIN organizations o ON o.id = s.tenant_id - WHERE s.tenant_id IS NOT NULL - ORDER BY o.name, s.category, s.name - """) - return [dict(row) for row in await cursor.fetchall()] - - -@router.post("/tenant/{source_id}/promote") -async def promote_to_global( - source_id: int, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Tenant-Quelle zur Grundquelle befoerdern.""" - cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) - row = await cursor.fetchone() - if not row: - raise HTTPException(status_code=404, detail="Quelle nicht gefunden") - if row["tenant_id"] is None: - raise HTTPException(status_code=400, detail="Bereits eine Grundquelle") - - if row["url"]: - cursor = await db.execute( - "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", - (row["url"],), - ) - if await cursor.fetchone(): - raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden") - - await db.execute( - "UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?", - (source_id,), - ) - await db.commit() - - cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) - return dict(await cursor.fetchone()) - - - -@router.post("/discover") -async def discover_source_endpoint( - url: str, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen. - - Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt - die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen. - """ - try: - multi = await discover_all_feeds(url) - except Exception as e: - logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen") - - domain = multi["domain"] - category = multi["category"] - feeds = multi.get("feeds", []) - - # Fallback auf Einzel-Discovery wenn keine Feeds gefunden - if not feeds: - try: - single = await discover_source(url) - if single.get("rss_url"): - feeds = [{"name": single["name"], "url": single["rss_url"]}] - domain = single.get("domain", domain) - category = single.get("category", category) - except Exception: - pass - - if not feeds: - return { - "domain": domain, - "category": category, - "feeds": [], - "existing": [], - "message": "Keine RSS-Feeds gefunden", - } - - # Mit Claude bewerten - try: - relevant_feeds = await evaluate_feeds_with_claude(domain, feeds) - except Exception: - relevant_feeds = feeds[:3] - - # Bereits vorhandene Grundquellen pruefen - cursor = await db.execute( - "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" - ) - existing_urls = {row["url"] for row in await cursor.fetchall()} - - result_feeds = [] - existing = [] - for feed in relevant_feeds: - info = { - "name": feed.get("name", domain_to_display_name(domain)), - "url": feed["url"], - "domain": domain, - "category": category, - } - if feed["url"] in existing_urls: - existing.append(info) - else: - result_feeds.append(info) - - return { - "domain": domain, - "category": category, - "feeds": result_feeds, - "existing": existing, - } - - -@router.post("/discover/add") -async def add_discovered_sources( - feeds: list[dict], - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Erkannte Feeds als Grundquellen anlegen. - - Erwartet eine Liste von {name, url, domain, category}. - Ueberspringt bereits vorhandene URLs. - """ - cursor = await db.execute( - "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" - ) - existing_urls = {row["url"] for row in await cursor.fetchall()} - - added = 0 - skipped = 0 - for feed in feeds: - if not feed.get("url"): - continue - if feed["url"] in existing_urls: - skipped += 1 - continue - - domain = feed.get("domain", "") - await db.execute( - """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) - VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""", - (feed["name"], feed["url"], domain, feed.get("category", "sonstige")), - ) - existing_urls.add(feed["url"]) - added += 1 - - # Web-Source für die Domain anlegen wenn noch nicht vorhanden - if feeds and feeds[0].get("domain"): - domain = feeds[0]["domain"] - cursor = await db.execute( - "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL", - (domain.lower(),), - ) - if not await cursor.fetchone(): - await db.execute( - """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) - VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""", - (domain_to_display_name(domain), f"https://{domain}", domain, - feeds[0].get("category", "sonstige")), - ) - added += 1 - - await db.commit() - return {"added": added, "skipped": skipped} - - - -# --- Health-Check & Vorschläge --- - -@router.get("/health") -async def get_health( - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Health-Check-Ergebnisse abrufen.""" - # Prüfen ob Tabelle existiert - cursor = await db.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'" - ) - if not await cursor.fetchone(): - return {"last_check": None, "total_checks": 0, "errors": 0, "warnings": 0, "ok": 0, "checks": []} - - cursor = await db.execute(""" - SELECT - h.id, h.source_id, s.name, s.domain, s.url, s.source_type, - h.check_type, h.status, h.message, h.details, h.checked_at - FROM source_health_checks h - JOIN sources s ON s.id = h.source_id - ORDER BY - CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, - s.name - """) - checks = [dict(row) for row in await cursor.fetchall()] - - error_count = sum(1 for c in checks if c["status"] == "error") - warning_count = sum(1 for c in checks if c["status"] == "warning") - ok_count = sum(1 for c in checks if c["status"] == "ok") - - cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks") - row = await cursor.fetchone() - last_check = row["last_check"] if row else None - - return { - "last_check": last_check, - "total_checks": len(checks), - "errors": error_count, - "warnings": warning_count, - "ok": ok_count, - "checks": checks, - } - - -@router.get("/suggestions") -async def get_suggestions( - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete).""" - cursor = await db.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'" - ) - if not await cursor.fetchone(): - return [] - - cursor = await db.execute(""" - SELECT * FROM source_suggestions - ORDER BY - CASE status WHEN 'pending' THEN 0 ELSE 1 END, - created_at DESC - LIMIT 50 - """) - return [dict(row) for row in await cursor.fetchall()] - - -class SuggestionAction(BaseModel): - accept: bool - - -@router.put("/suggestions/{suggestion_id}") -async def update_suggestion( - suggestion_id: int, - action: SuggestionAction, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Vorschlag annehmen oder ablehnen.""" - import json as _json - - cursor = await db.execute( - "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,) - ) - suggestion = await cursor.fetchone() - if not suggestion: - raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden") - - suggestion = dict(suggestion) - if suggestion["status"] != "pending": - raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}") - - new_status = "accepted" if action.accept else "rejected" - result_action = None - - if action.accept: - stype = suggestion["suggestion_type"] - data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {} - - if stype == "add_source": - name = data.get("name", "Unbenannt") - url = data.get("url") - domain = data.get("domain", "") - category = data.get("category", "sonstige") - source_type = "rss_feed" if url and any( - x in (url or "").lower() for x in ("rss", "feed", "xml", "atom") - ) else "web_source" - - if url: - cursor = await db.execute( - "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,) - ) - if await cursor.fetchone(): - result_action = "übersprungen (URL bereits vorhanden)" - new_status = "rejected" - else: - await db.execute( - "INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) " - "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", - (name, url, domain, source_type, category), - ) - result_action = f"Quelle '{name}' angelegt" - else: - result_action = "übersprungen (keine URL)" - new_status = "rejected" - - elif stype == "deactivate_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,)) - result_action = "Quelle deaktiviert" - - elif stype == "remove_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) - result_action = "Quelle gelöscht" - - elif stype == "fix_url": - source_id = suggestion["source_id"] - new_url = data.get("url") - if source_id and new_url: - await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id)) - result_action = f"URL aktualisiert" - - await db.execute( - "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?", - (new_status, suggestion_id), - ) - await db.commit() - return {"status": new_status, "action": result_action} - - -@router.post("/health/run") -async def run_health_check_now( - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Health-Check manuell starten.""" - # Tabellen sicherstellen - await db.executescript(""" - CREATE TABLE IF NOT EXISTS source_health_checks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, - check_type TEXT NOT NULL, - status TEXT NOT NULL, - message TEXT, - details TEXT, - checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, - priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', - reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) - await db.commit() - - # source_health und source_suggester importieren - sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from services.source_health import run_health_checks - from services.source_suggester import generate_suggestions - - result = await run_health_checks(db) - suggestion_count = await generate_suggestions(db) - - return { - "checked": result["checked"], - "issues": result["issues"], - "suggestions": suggestion_count, - } - - - -@router.post("/health/search-fix/{source_id}") -async def search_fix_for_source( - source_id: int, - admin: dict = Depends(get_current_admin), - db: aiosqlite.Connection = Depends(db_dependency), -): - """Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen.""" - import json as _json - - cursor = await db.execute( - "SELECT id, name, url, domain, source_type, category FROM sources WHERE id = ?", - (source_id,), - ) - source = await cursor.fetchone() - if not source: - raise HTTPException(status_code=404, detail="Quelle nicht gefunden") - - source = dict(source) - - # Health-Check-Probleme für diese Quelle laden - cursor = await db.execute( - "SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?", - (source_id,), - ) - issues = [dict(row) for row in await cursor.fetchall()] - issues_text = "\n".join(f"- {i['check_type']}: {i['status']} - {i['message']}" for i in issues) - - prompt = f"""Du bist ein OSINT-Analyst. Folgende Quelle ist nicht mehr erreichbar: - -Name: {source['name']} -URL: {source['url'] or 'keine'} -Domain: {source['domain'] or 'unbekannt'} -Typ: {source['source_type']} -Kategorie: {source['category']} - -Probleme: -{issues_text} - -Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle. -- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren -- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs) -- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar? - -Antworte NUR mit einem JSON-Objekt: -{{ - "fixable": true/false, - "solutions": [ - {{ - "type": "replace_url|add_feed|deactivate", - "name": "Anzeigename", - "url": "https://...", - "description": "Kurze Begründung" - }} - ], - "summary": "Zusammenfassung in 1-2 Sätzen" -}} - -Nur das JSON, kein anderer Text.""" - - sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") - from agents.claude_client import call_claude - - try: - response, usage = await call_claude(prompt, tools="WebSearch,WebFetch") - - import re - json_match = re.search(r'\{.*\}', response, re.DOTALL) - if json_match: - result = _json.loads(json_match.group(0)) - else: - result = {"fixable": False, "solutions": [], "summary": response[:500]} - - # Lösungen als Vorschläge speichern - await db.executescript(""" - CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, - priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', - reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) - - for sol in result.get("solutions", []): - sol_type = sol.get("type", "add_feed") - suggestion_type = { - "replace_url": "fix_url", - "add_feed": "add_source", - "deactivate": "deactivate_source", - }.get(sol_type, "add_source") - - title = f"{source['name']}: {sol.get('description', sol_type)[:80]}" - - # Duplikat-Check - cursor = await db.execute( - "SELECT id FROM source_suggestions WHERE title = ? AND status = 'pending'", - (title,), - ) - if await cursor.fetchone(): - continue - - data = _json.dumps({ - "name": sol.get("name", source["name"]), - "url": sol.get("url", ""), - "domain": source["domain"] or "", - "category": source["category"], - }, ensure_ascii=False) - - await db.execute( - "INSERT INTO source_suggestions " - "(suggestion_type, title, description, source_id, suggested_data, priority, status) " - "VALUES (?, ?, ?, ?, ?, 'high', 'pending')", - (suggestion_type, title, sol.get("description", ""), source_id, data), - ) - - await db.commit() - - result["cost_usd"] = usage.cost_usd - result["tokens"] = {"input": usage.input_tokens, "output": usage.output_tokens} - return result - - except Exception as e: - raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}") +import os +"""Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" +import sys +import logging + +# Monitor-Source-Rules verfügbar machen +sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, Field +from typing import Optional +from auth import get_current_admin +from database import db_dependency +import aiosqlite + +sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src')) + +from source_rules import ( + discover_source, + discover_all_feeds, + evaluate_feeds_with_claude, + _extract_domain, + _detect_category, + domain_to_display_name, +) + +logger = logging.getLogger("verwaltung.sources") + +router = APIRouter(prefix="/api/sources", tags=["sources"]) + + +class GlobalSourceCreate(BaseModel): + name: str = Field(min_length=1, max_length=200) + url: Optional[str] = None + domain: Optional[str] = None + source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded)$") + category: str = Field(default="sonstige") + status: str = Field(default="active", pattern="^(active|inactive)$") + notes: Optional[str] = None + + +class GlobalSourceUpdate(BaseModel): + name: Optional[str] = Field(default=None, max_length=200) + url: Optional[str] = None + domain: Optional[str] = None + source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded)$") + category: Optional[str] = None + status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") + notes: Optional[str] = None + + +@router.get("/global") +async def list_global_sources( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Alle Grundquellen auflisten (tenant_id IS NULL).""" + cursor = await db.execute( + "SELECT * FROM sources WHERE tenant_id IS NULL ORDER BY category, source_type, name" + ) + return [dict(row) for row in await cursor.fetchall()] + + +@router.post("/global", status_code=201) +async def create_global_source( + data: GlobalSourceCreate, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Neue Grundquelle anlegen.""" + if data.url: + cursor = await db.execute( + "SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL", + (data.url,), + ) + existing = await cursor.fetchone() + if existing: + raise HTTPException( + status_code=409, + detail=f"URL bereits vorhanden: {existing['name']}", + ) + + cursor = await db.execute( + """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, 'system', NULL)""", + (data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes), + ) + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) + return dict(await cursor.fetchone()) + + +@router.put("/global/{source_id}") +async def update_global_source( + source_id: int, + data: GlobalSourceUpdate, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Grundquelle bearbeiten.""" + cursor = await db.execute( + "SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) + ) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") + + updates = {} + for field, value in data.model_dump(exclude_none=True).items(): + updates[field] = value + + if not updates: + return dict(row) + + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [source_id] + await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + return dict(await cursor.fetchone()) + + +@router.delete("/global/{source_id}", status_code=204) +async def delete_global_source( + source_id: int, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Grundquelle loeschen.""" + cursor = await db.execute( + "SELECT id FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) + ) + if not await cursor.fetchone(): + raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") + + await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) + await db.commit() + + +@router.get("/tenant") +async def list_tenant_sources( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Alle tenant-spezifischen Quellen mit Org-Name auflisten.""" + cursor = await db.execute(""" + SELECT s.*, o.name as org_name + FROM sources s + LEFT JOIN organizations o ON o.id = s.tenant_id + WHERE s.tenant_id IS NOT NULL + ORDER BY o.name, s.category, s.name + """) + return [dict(row) for row in await cursor.fetchall()] + + +@router.post("/tenant/{source_id}/promote") +async def promote_to_global( + source_id: int, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Tenant-Quelle zur Grundquelle befoerdern.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + if row["tenant_id"] is None: + raise HTTPException(status_code=400, detail="Bereits eine Grundquelle") + + if row["url"]: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", + (row["url"],), + ) + if await cursor.fetchone(): + raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden") + + await db.execute( + "UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?", + (source_id,), + ) + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + return dict(await cursor.fetchone()) + + + +@router.post("/discover") +async def discover_source_endpoint( + url: str, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen. + + Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt + die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen. + """ + try: + multi = await discover_all_feeds(url) + except Exception as e: + logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen") + + domain = multi["domain"] + category = multi["category"] + feeds = multi.get("feeds", []) + + # Fallback auf Einzel-Discovery wenn keine Feeds gefunden + if not feeds: + try: + single = await discover_source(url) + if single.get("rss_url"): + feeds = [{"name": single["name"], "url": single["rss_url"]}] + domain = single.get("domain", domain) + category = single.get("category", category) + except Exception: + pass + + if not feeds: + return { + "domain": domain, + "category": category, + "feeds": [], + "existing": [], + "message": "Keine RSS-Feeds gefunden", + } + + # Mit Claude bewerten + try: + relevant_feeds = await evaluate_feeds_with_claude(domain, feeds) + except Exception: + relevant_feeds = feeds[:3] + + # Bereits vorhandene Grundquellen pruefen + cursor = await db.execute( + "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" + ) + existing_urls = {row["url"] for row in await cursor.fetchall()} + + result_feeds = [] + existing = [] + for feed in relevant_feeds: + info = { + "name": feed.get("name", domain_to_display_name(domain)), + "url": feed["url"], + "domain": domain, + "category": category, + } + if feed["url"] in existing_urls: + existing.append(info) + else: + result_feeds.append(info) + + return { + "domain": domain, + "category": category, + "feeds": result_feeds, + "existing": existing, + } + + +@router.post("/discover/add") +async def add_discovered_sources( + feeds: list[dict], + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Erkannte Feeds als Grundquellen anlegen. + + Erwartet eine Liste von {name, url, domain, category}. + Ueberspringt bereits vorhandene URLs. + """ + cursor = await db.execute( + "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" + ) + existing_urls = {row["url"] for row in await cursor.fetchall()} + + added = 0 + skipped = 0 + for feed in feeds: + if not feed.get("url"): + continue + if feed["url"] in existing_urls: + skipped += 1 + continue + + domain = feed.get("domain", "") + await db.execute( + """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) + VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""", + (feed["name"], feed["url"], domain, feed.get("category", "sonstige")), + ) + existing_urls.add(feed["url"]) + added += 1 + + # Web-Source für die Domain anlegen wenn noch nicht vorhanden + if feeds and feeds[0].get("domain"): + domain = feeds[0]["domain"] + cursor = await db.execute( + "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL", + (domain.lower(),), + ) + if not await cursor.fetchone(): + await db.execute( + """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) + VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""", + (domain_to_display_name(domain), f"https://{domain}", domain, + feeds[0].get("category", "sonstige")), + ) + added += 1 + + await db.commit() + return {"added": added, "skipped": skipped} + + + +# --- Health-Check & Vorschläge --- + +@router.get("/health") +async def get_health( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Health-Check-Ergebnisse abrufen.""" + # Prüfen ob Tabelle existiert + cursor = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'" + ) + if not await cursor.fetchone(): + return {"last_check": None, "total_checks": 0, "errors": 0, "warnings": 0, "ok": 0, "checks": []} + + cursor = await db.execute(""" + SELECT + h.id, h.source_id, s.name, s.domain, s.url, s.source_type, + h.check_type, h.status, h.message, h.details, h.checked_at + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + ORDER BY + CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, + s.name + """) + checks = [dict(row) for row in await cursor.fetchall()] + + error_count = sum(1 for c in checks if c["status"] == "error") + warning_count = sum(1 for c in checks if c["status"] == "warning") + ok_count = sum(1 for c in checks if c["status"] == "ok") + + cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks") + row = await cursor.fetchone() + last_check = row["last_check"] if row else None + + return { + "last_check": last_check, + "total_checks": len(checks), + "errors": error_count, + "warnings": warning_count, + "ok": ok_count, + "checks": checks, + } + + +@router.get("/suggestions") +async def get_suggestions( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete).""" + cursor = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'" + ) + if not await cursor.fetchone(): + return [] + + cursor = await db.execute(""" + SELECT * FROM source_suggestions + ORDER BY + CASE status WHEN 'pending' THEN 0 ELSE 1 END, + created_at DESC + LIMIT 50 + """) + return [dict(row) for row in await cursor.fetchall()] + + +class SuggestionAction(BaseModel): + accept: bool + + +@router.put("/suggestions/{suggestion_id}") +async def update_suggestion( + suggestion_id: int, + action: SuggestionAction, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Vorschlag annehmen oder ablehnen.""" + import json as _json + + cursor = await db.execute( + "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,) + ) + suggestion = await cursor.fetchone() + if not suggestion: + raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden") + + suggestion = dict(suggestion) + if suggestion["status"] != "pending": + raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}") + + new_status = "accepted" if action.accept else "rejected" + result_action = None + + if action.accept: + stype = suggestion["suggestion_type"] + data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {} + + if stype == "add_source": + name = data.get("name", "Unbenannt") + url = data.get("url") + domain = data.get("domain", "") + category = data.get("category", "sonstige") + source_type = "rss_feed" if url and any( + x in (url or "").lower() for x in ("rss", "feed", "xml", "atom") + ) else "web_source" + + if url: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,) + ) + if await cursor.fetchone(): + result_action = "übersprungen (URL bereits vorhanden)" + new_status = "rejected" + else: + await db.execute( + "INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) " + "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", + (name, url, domain, source_type, category), + ) + result_action = f"Quelle '{name}' angelegt" + else: + result_action = "übersprungen (keine URL)" + new_status = "rejected" + + elif stype == "deactivate_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,)) + result_action = "Quelle deaktiviert" + + elif stype == "remove_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) + result_action = "Quelle gelöscht" + + elif stype == "fix_url": + source_id = suggestion["source_id"] + new_url = data.get("url") + if source_id and new_url: + await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id)) + result_action = f"URL aktualisiert" + + # Auto-Reject: Wenn fix_url oder add_source akzeptiert wird, + # zugehörige deactivate_source-Vorschläge automatisch ablehnen + if stype in ("fix_url", "add_source") and suggestion.get("source_id"): + await db.execute( + "UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP " + "WHERE source_id = ? AND suggestion_type = 'deactivate_source' AND status = 'pending' AND id != ?", + (suggestion["source_id"], suggestion_id), + ) + + await db.execute( + "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?", + (new_status, suggestion_id), + ) + await db.commit() + return {"status": new_status, "action": result_action} + + +@router.post("/health/run") +async def run_health_check_now( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Health-Check manuell starten.""" + # Tabellen sicherstellen + await db.executescript(""" + CREATE TABLE IF NOT EXISTS source_health_checks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + check_type TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + details TEXT, + checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE IF NOT EXISTS source_suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + suggestion_type TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, + suggested_data TEXT, + priority TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + reviewed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """) + await db.commit() + + # source_health und source_suggester importieren + sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") + from services.source_health import run_health_checks + from services.source_suggester import generate_suggestions + + result = await run_health_checks(db) + suggestion_count = await generate_suggestions(db) + + return { + "checked": result["checked"], + "issues": result["issues"], + "suggestions": suggestion_count, + } + + + +@router.post("/health/search-fix/{source_id}") +async def search_fix_for_source( + source_id: int, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen.""" + import json as _json + + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category FROM sources WHERE id = ?", + (source_id,), + ) + source = await cursor.fetchone() + if not source: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + + source = dict(source) + + # Health-Check-Probleme für diese Quelle laden + cursor = await db.execute( + "SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?", + (source_id,), + ) + issues = [dict(row) for row in await cursor.fetchall()] + issues_text = "\n".join(f"- {i['check_type']}: {i['status']} - {i['message']}" for i in issues) + + prompt = f"""Du bist ein OSINT-Analyst. Folgende Quelle ist nicht mehr erreichbar: + +Name: {source['name']} +URL: {source['url'] or 'keine'} +Domain: {source['domain'] or 'unbekannt'} +Typ: {source['source_type']} +Kategorie: {source['category']} + +Probleme: +{issues_text} + +Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle. +- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren +- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs) +- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar? + +Regeln: +- Maximal 3 Lösungen vorschlagen (die besten) +- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss) + +Antworte NUR mit einem JSON-Objekt: +{{ + "fixable": true/false, + "solutions": [ + {{ + "type": "replace_url|add_feed|deactivate", + "name": "Anzeigename", + "url": "https://...", + "description": "Kurze Begründung" + }} + ], + "summary": "Zusammenfassung in 1-2 Sätzen" +}} + +Nur das JSON, kein anderer Text.""" + + sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") + from agents.claude_client import call_claude + + try: + response, usage = await call_claude(prompt, tools="WebSearch,WebFetch") + + import re + json_match = re.search(r'\{.*\}', response, re.DOTALL) + if json_match: + result = _json.loads(json_match.group(0)) + else: + result = {"fixable": False, "solutions": [], "summary": response[:500]} + + # Lösungen als Vorschläge speichern + await db.executescript(""" + CREATE TABLE IF NOT EXISTS source_suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + suggestion_type TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, + suggested_data TEXT, + priority TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + reviewed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """) + + for sol in result.get("solutions", []): + sol_type = sol.get("type", "add_feed") + suggestion_type = { + "replace_url": "fix_url", + "add_feed": "add_source", + "deactivate": "deactivate_source", + }.get(sol_type, "add_source") + + title = f"{source['name']}: {sol.get('description', sol_type)[:120]}" + + # Duplikat-Check + cursor = await db.execute( + "SELECT id FROM source_suggestions WHERE title = ? AND status = 'pending'", + (title,), + ) + if await cursor.fetchone(): + continue + + data = _json.dumps({ + "name": sol.get("name", source["name"]), + "url": sol.get("url", ""), + "domain": source["domain"] or "", + "category": source["category"], + }, ensure_ascii=False) + + await db.execute( + "INSERT INTO source_suggestions " + "(suggestion_type, title, description, source_id, suggested_data, priority, status) " + "VALUES (?, ?, ?, ?, ?, 'high', 'pending')", + (suggestion_type, title, sol.get("description", ""), source_id, data), + ) + + await db.commit() + + result["cost_usd"] = usage.cost_usd + result["tokens"] = {"input": usage.input_tokens, "output": usage.output_tokens} + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}")