import os """Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" import sys import logging # Monitor-Source-Rules verfügbar machen sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from typing import Optional from auth import get_current_admin from database import db_dependency import aiosqlite sys.path.insert(0, os.path.join('/home/claude-dev/AegisSight-Monitor/src')) from source_rules import ( discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, ) logger = logging.getLogger("verwaltung.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) class GlobalSourceCreate(BaseModel): name: str = Field(min_length=1, max_length=200) url: Optional[str] = None domain: Optional[str] = None source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded)$") category: str = Field(default="sonstige") status: str = Field(default="active", pattern="^(active|inactive)$") notes: Optional[str] = None class GlobalSourceUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=200) url: Optional[str] = None domain: Optional[str] = None source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded)$") category: Optional[str] = None status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") notes: Optional[str] = None @router.get("/global") async def list_global_sources( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Grundquellen auflisten (tenant_id IS NULL).""" cursor = await db.execute( "SELECT * FROM sources WHERE tenant_id IS NULL ORDER BY category, source_type, name" ) return [dict(row) for row in await cursor.fetchall()] @router.post("/global", status_code=201) async def create_global_source( data: GlobalSourceCreate, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Neue Grundquelle anlegen.""" if data.url: cursor = await db.execute( "SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL", (data.url,), ) existing = await cursor.fetchone() if existing: raise HTTPException( status_code=409, detail=f"URL bereits vorhanden: {existing['name']}", ) cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, 'system', NULL)""", (data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes), ) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) return dict(await cursor.fetchone()) @router.put("/global/{source_id}") async def update_global_source( source_id: int, data: GlobalSourceUpdate, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Grundquelle bearbeiten.""" cursor = await db.execute( "SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) ) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") updates = {} for field, value in data.model_dump(exclude_none=True).items(): updates[field] = value if not updates: return dict(row) set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [source_id] await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) return dict(await cursor.fetchone()) @router.delete("/global/{source_id}", status_code=204) async def delete_global_source( source_id: int, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Grundquelle loeschen.""" cursor = await db.execute( "SELECT id FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) ) if not await cursor.fetchone(): raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) await db.commit() @router.get("/tenant") async def list_tenant_sources( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle tenant-spezifischen Quellen mit Org-Name auflisten.""" cursor = await db.execute(""" SELECT s.*, o.name as org_name FROM sources s LEFT JOIN organizations o ON o.id = s.tenant_id WHERE s.tenant_id IS NOT NULL ORDER BY o.name, s.category, s.name """) return [dict(row) for row in await cursor.fetchall()] @router.post("/tenant/{source_id}/promote") async def promote_to_global( source_id: int, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Tenant-Quelle zur Grundquelle befoerdern.""" cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Quelle nicht gefunden") if row["tenant_id"] is None: raise HTTPException(status_code=400, detail="Bereits eine Grundquelle") if row["url"]: cursor = await db.execute( "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (row["url"],), ) if await cursor.fetchone(): raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden") await db.execute( "UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?", (source_id,), ) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) return dict(await cursor.fetchone()) @router.post("/discover") async def discover_source_endpoint( url: str, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen. Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen. """ try: multi = await discover_all_feeds(url) except Exception as e: logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen") domain = multi["domain"] category = multi["category"] feeds = multi.get("feeds", []) # Fallback auf Einzel-Discovery wenn keine Feeds gefunden if not feeds: try: single = await discover_source(url) if single.get("rss_url"): feeds = [{"name": single["name"], "url": single["rss_url"]}] domain = single.get("domain", domain) category = single.get("category", category) except Exception: pass if not feeds: return { "domain": domain, "category": category, "feeds": [], "existing": [], "message": "Keine RSS-Feeds gefunden", } # Mit Claude bewerten try: relevant_feeds = await evaluate_feeds_with_claude(domain, feeds) except Exception: relevant_feeds = feeds[:3] # Bereits vorhandene Grundquellen pruefen cursor = await db.execute( "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" ) existing_urls = {row["url"] for row in await cursor.fetchall()} result_feeds = [] existing = [] for feed in relevant_feeds: info = { "name": feed.get("name", domain_to_display_name(domain)), "url": feed["url"], "domain": domain, "category": category, } if feed["url"] in existing_urls: existing.append(info) else: result_feeds.append(info) return { "domain": domain, "category": category, "feeds": result_feeds, "existing": existing, } @router.post("/discover/add") async def add_discovered_sources( feeds: list[dict], admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Erkannte Feeds als Grundquellen anlegen. Erwartet eine Liste von {name, url, domain, category}. Ueberspringt bereits vorhandene URLs. """ cursor = await db.execute( "SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL" ) existing_urls = {row["url"] for row in await cursor.fetchall()} added = 0 skipped = 0 for feed in feeds: if not feed.get("url"): continue if feed["url"] in existing_urls: skipped += 1 continue domain = feed.get("domain", "") await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""", (feed["name"], feed["url"], domain, feed.get("category", "sonstige")), ) existing_urls.add(feed["url"]) added += 1 # Web-Source für die Domain anlegen wenn noch nicht vorhanden if feeds and feeds[0].get("domain"): domain = feeds[0]["domain"] cursor = await db.execute( "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL", (domain.lower(),), ) if not await cursor.fetchone(): await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""", (domain_to_display_name(domain), f"https://{domain}", domain, feeds[0].get("category", "sonstige")), ) added += 1 await db.commit() return {"added": added, "skipped": skipped} # --- Health-Check & Vorschläge --- @router.get("/health") async def get_health( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Health-Check-Ergebnisse abrufen.""" # Prüfen ob Tabelle existiert cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'" ) if not await cursor.fetchone(): return {"last_check": None, "total_checks": 0, "errors": 0, "warnings": 0, "ok": 0, "checks": []} cursor = await db.execute(""" SELECT h.id, h.source_id, s.name, s.domain, s.url, s.source_type, h.check_type, h.status, h.message, h.details, h.checked_at FROM source_health_checks h JOIN sources s ON s.id = h.source_id ORDER BY CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, s.name """) checks = [dict(row) for row in await cursor.fetchall()] error_count = sum(1 for c in checks if c["status"] == "error") warning_count = sum(1 for c in checks if c["status"] == "warning") ok_count = sum(1 for c in checks if c["status"] == "ok") cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks") row = await cursor.fetchone() last_check = row["last_check"] if row else None return { "last_check": last_check, "total_checks": len(checks), "errors": error_count, "warnings": warning_count, "ok": ok_count, "checks": checks, } @router.get("/suggestions") async def get_suggestions( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete).""" cursor = await db.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'" ) if not await cursor.fetchone(): return [] cursor = await db.execute(""" SELECT * FROM source_suggestions ORDER BY CASE status WHEN 'pending' THEN 0 ELSE 1 END, created_at DESC LIMIT 50 """) return [dict(row) for row in await cursor.fetchall()] class SuggestionAction(BaseModel): accept: bool @router.put("/suggestions/{suggestion_id}") async def update_suggestion( suggestion_id: int, action: SuggestionAction, admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Vorschlag annehmen oder ablehnen.""" import json as _json cursor = await db.execute( "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,) ) suggestion = await cursor.fetchone() if not suggestion: raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden") suggestion = dict(suggestion) if suggestion["status"] != "pending": raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}") new_status = "accepted" if action.accept else "rejected" result_action = None if action.accept: stype = suggestion["suggestion_type"] data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {} if stype == "add_source": name = data.get("name", "Unbenannt") url = data.get("url") domain = data.get("domain", "") category = data.get("category", "sonstige") source_type = "rss_feed" if url and any( x in (url or "").lower() for x in ("rss", "feed", "xml", "atom") ) else "web_source" if url: cursor = await db.execute( "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,) ) if await cursor.fetchone(): result_action = "übersprungen (URL bereits vorhanden)" new_status = "rejected" else: await db.execute( "INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) " "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", (name, url, domain, source_type, category), ) result_action = f"Quelle '{name}' angelegt" else: result_action = "übersprungen (keine URL)" new_status = "rejected" elif stype == "deactivate_source": source_id = suggestion["source_id"] if source_id: await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,)) result_action = "Quelle deaktiviert" elif stype == "remove_source": source_id = suggestion["source_id"] if source_id: await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) result_action = "Quelle gelöscht" elif stype == "fix_url": source_id = suggestion["source_id"] new_url = data.get("url") if source_id and new_url: await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id)) result_action = f"URL aktualisiert" await db.execute( "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?", (new_status, suggestion_id), ) await db.commit() return {"status": new_status, "action": result_action} @router.post("/health/run") async def run_health_check_now( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): """Health-Check manuell starten.""" # Tabellen sicherstellen await db.executescript(""" CREATE TABLE IF NOT EXISTS source_health_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, check_type TEXT NOT NULL, status TEXT NOT NULL, message TEXT, details TEXT, checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS source_suggestions ( id INTEGER PRIMARY KEY AUTOINCREMENT, suggestion_type TEXT NOT NULL, title TEXT NOT NULL, description TEXT, source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, suggested_data TEXT, priority TEXT DEFAULT 'medium', status TEXT DEFAULT 'pending', reviewed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) await db.commit() # source_health und source_suggester importieren sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") from services.source_health import run_health_checks from services.source_suggester import generate_suggestions result = await run_health_checks(db) suggestion_count = await generate_suggestions(db) return { "checked": result["checked"], "issues": result["issues"], "suggestions": suggestion_count, }