"""Sources-Router: Quellenverwaltung (Multi-Tenant). Klassifikation: Read-Only — Pflege in der Verwaltung.""" import json import logging from collections import defaultdict from fastapi import APIRouter, Depends, HTTPException, status from models import SourceCreate, SourceUpdate, SourceResponse, DiscoverRequest, DiscoverResponse, DiscoverMultiResponse, DomainActionRequest from auth import get_current_user from database import db_dependency, refresh_source_counts from source_rules import discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, _DOMAIN_ALIASES import aiosqlite logger = logging.getLogger("osint.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) SOURCE_UPDATE_COLUMNS = { "name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", } async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]: """Lädt alignments fuer mehrere Quellen — Read-Only fuer Anzeige (Pflege in Verwaltung).""" if not source_ids: return {} placeholders = ",".join("?" for _ in source_ids) cursor = await db.execute( f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment", source_ids, ) out: dict[int, list[str]] = {sid: [] for sid in source_ids} for row in await cursor.fetchall(): out.setdefault(row["source_id"], []).append(row["alignment"]) return out def _check_source_ownership(source: dict, username: str): """Prueft ob der Nutzer die Quelle bearbeiten/loeschen darf. System-Quellen (auto-entdeckt) duerfen von jedem bearbeitet werden. Nutzer-Quellen nur vom Ersteller. """ added_by = source.get("added_by", "") if added_by and added_by != "system" and added_by != username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Nur der Ersteller kann diese Quelle bearbeiten", ) @router.get("", response_model=list[SourceResponse]) async def list_sources( source_type: str = None, category: str = None, source_status: str = None, political_orientation: str = None, media_type: str = None, reliability: str = None, state_affiliated: bool = None, alignment: str = None, ifcn_signatory: bool = None, eu_disinfo_listed: bool = None, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Quellen auflisten (global + org-spezifisch).""" tenant_id = current_user.get("tenant_id") # Global (tenant_id=NULL) + eigene Org query = "SELECT s.* FROM sources s WHERE (s.tenant_id IS NULL OR s.tenant_id = ?)" params: list = [tenant_id] if source_type: query += " AND s.source_type = ?" params.append(source_type) if category: query += " AND s.category = ?" params.append(category) if source_status: query += " AND s.status = ?" params.append(source_status) if political_orientation: query += " AND s.political_orientation = ?" params.append(political_orientation) if media_type: query += " AND s.media_type = ?" params.append(media_type) if reliability: query += " AND s.reliability = ?" params.append(reliability) if state_affiliated is not None: query += " AND s.state_affiliated = ?" params.append(1 if state_affiliated else 0) if alignment: query += " AND EXISTS (SELECT 1 FROM source_alignments sa WHERE sa.source_id = s.id AND sa.alignment = ?)" params.append(alignment.lower()) if ifcn_signatory is not None: query += " AND s.ifcn_signatory = ?" params.append(1 if ifcn_signatory else 0) if eu_disinfo_listed is not None: query += " AND s.eu_disinfo_listed = ?" params.append(1 if eu_disinfo_listed else 0) query += " ORDER BY s.source_type, s.category, s.name" cursor = await db.execute(query, params) rows = await cursor.fetchall() results = [dict(row) for row in rows] alignments_map = await _load_alignments_for(db, [r["id"] for r in results]) for d in results: d["is_global"] = d.get("tenant_id") is None d["state_affiliated"] = bool(d.get("state_affiliated")) d["ifcn_signatory"] = bool(d.get("ifcn_signatory")) d["eu_disinfo_listed"] = bool(d.get("eu_disinfo_listed")) d["alignments"] = alignments_map.get(d["id"], []) return results @router.get("/stats") async def get_source_stats( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Aggregierte Quellen-Statistiken (global + eigene Org).""" tenant_id = current_user.get("tenant_id") cursor = await db.execute(""" SELECT source_type, COUNT(*) as count, SUM(article_count) as total_articles FROM sources WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) GROUP BY source_type """, (tenant_id,)) rows = await cursor.fetchall() stats = { "rss_feed": {"count": 0, "articles": 0}, "web_source": {"count": 0, "articles": 0}, "telegram_channel": {"count": 0, "articles": 0}, "excluded": {"count": 0, "articles": 0}, } for row in rows: st = row["source_type"] if st in stats: stats[st]["count"] = row["count"] stats[st]["articles"] = row["total_articles"] or 0 cursor = await db.execute( "SELECT COUNT(*) as cnt FROM articles WHERE tenant_id = ?", (tenant_id,), ) total_row = await cursor.fetchone() return { "by_type": stats, "total_sources": sum(s["count"] for s in stats.values()), "total_articles": total_row["cnt"], } @router.post("/discover", response_model=DiscoverResponse) async def discover_source_endpoint( data: DiscoverRequest, current_user: dict = Depends(get_current_user), ): """RSS-Feed, Name, Kategorie und Domain einer URL automatisch erkennen.""" try: result = await discover_source(data.url) return result except Exception as e: logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen") @router.post("/discover-multi", response_model=DiscoverMultiResponse) async def discover_multi_endpoint( data: DiscoverRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Findet ALLE RSS-Feeds einer Domain, bewertet sie mit Claude und legt relevante als Quellen an.""" tenant_id = current_user.get("tenant_id") try: multi = await discover_all_feeds(data.url) domain = multi["domain"] category = multi["category"] if not multi["feeds"]: single = await discover_source(data.url) sources = [] if single.get("rss_url"): cursor = await db.execute( "SELECT id FROM sources WHERE url = ?", (single["rss_url"],) ) existing = await cursor.fetchone() if not existing: cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, ?, ?, 'active', ?, ?)""", (single["name"], single["rss_url"], single["domain"], single["source_type"], single["category"], current_user["username"], tenant_id), ) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) row = await cursor.fetchone() sources.append(dict(row)) return DiscoverMultiResponse( domain=single.get("domain", domain), category=single.get("category", category), added_count=len(sources), skipped_count=1 if not sources and single.get("rss_url") else 0, total_found=1 if single.get("rss_url") else 0, sources=sources, fallback_single=True, ) relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"]) cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL") existing_urls = {row["url"] for row in await cursor.fetchall()} new_ids = [] skipped = 0 for feed in relevant_feeds: if feed["url"] in existing_urls: skipped += 1 continue cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""", (feed["name"], feed["url"], domain, category, current_user["username"], tenant_id), ) new_ids.append(cursor.lastrowid) existing_urls.add(feed["url"]) cursor = await db.execute( "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source'", (domain.lower(),), ) if not await cursor.fetchone(): cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'web_source', ?, 'active', ?, ?)""", (domain_to_display_name(domain), f"https://{domain}", domain, category, current_user["username"], tenant_id), ) new_ids.append(cursor.lastrowid) await db.commit() added_sources = [] if new_ids: placeholders = ",".join("?" for _ in new_ids) cursor = await db.execute( f"SELECT * FROM sources WHERE id IN ({placeholders}) ORDER BY id", new_ids, ) added_sources = [dict(row) for row in await cursor.fetchall()] return DiscoverMultiResponse( domain=domain, category=category, added_count=len(added_sources), skipped_count=skipped, total_found=len(multi["feeds"]), sources=added_sources, fallback_single=False, ) except Exception as e: logger.error(f"Multi-Discovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Multi-Discovery fehlgeschlagen") @router.post("/rediscover-existing") async def rediscover_existing_endpoint( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Einmalige Migration: Bestehende RSS-Quellen nach zusaetzlichen Feeds durchsuchen.""" tenant_id = current_user.get("tenant_id") try: cursor = await db.execute( "SELECT * FROM sources WHERE source_type = 'rss_feed' AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)", (tenant_id,), ) existing_sources = [dict(row) for row in await cursor.fetchall()] domains = defaultdict(list) for src in existing_sources: if src["domain"]: domains[src["domain"]].append(src) cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL") existing_urls = {row["url"] for row in await cursor.fetchall()} domains_processed = 0 feeds_added = 0 feeds_skipped = 0 for domain, sources in domains.items(): domains_processed += 1 base_url = f"https://{domain}" try: multi = await discover_all_feeds(base_url) if not multi["feeds"]: continue relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"]) category = _detect_category(domain) for feed in relevant_feeds: if feed["url"] in existing_urls: feeds_skipped += 1 continue await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""", (feed["name"], feed["url"], domain, category, current_user["username"], tenant_id), ) existing_urls.add(feed["url"]) feeds_added += 1 await db.commit() except Exception as e: logger.warning(f"Rediscovery fuer {domain} fehlgeschlagen: {e}") continue return { "domains_processed": domains_processed, "feeds_added": feeds_added, "feeds_skipped": feeds_skipped, } except Exception as e: logger.error(f"Rediscovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Rediscovery fehlgeschlagen") @router.get("/my-exclusions") async def get_my_exclusions( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Gibt die vom aktuellen User ausgeschlossenen Domains zurück.""" user_id = current_user["id"] cursor = await db.execute( "SELECT domain, notes, created_at FROM user_excluded_domains WHERE user_id = ? ORDER BY domain", (user_id,), ) rows = await cursor.fetchall() return [dict(row) for row in rows] @router.post("/block-domain") async def block_domain( data: DomainActionRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Domain fuer den aktuellen User ausschließen (per-User, nicht org-weit).""" user_id = current_user["id"] domain = data.domain.lower().strip() # Pruefen ob bereits ausgeschlossen cursor = await db.execute( "SELECT id FROM user_excluded_domains WHERE user_id = ? AND domain = ?", (user_id, domain), ) existing = await cursor.fetchone() if existing: if data.notes: await db.execute( "UPDATE user_excluded_domains SET notes = ? WHERE id = ?", (data.notes, existing["id"]), ) await db.commit() return {"domain": domain, "status": "already_excluded"} await db.execute( "INSERT INTO user_excluded_domains (user_id, domain, notes) VALUES (?, ?, ?)", (user_id, domain, data.notes), ) await db.commit() return {"domain": domain, "status": "excluded"} @router.post("/unblock-domain") async def unblock_domain( data: DomainActionRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Domain-Ausschluss fuer den aktuellen User aufheben.""" user_id = current_user["id"] domain = data.domain.lower().strip() cursor = await db.execute( "DELETE FROM user_excluded_domains WHERE user_id = ? AND domain = ?", (user_id, domain), ) removed = cursor.rowcount await db.commit() return {"domain": domain, "removed": removed > 0} @router.delete("/domain/{domain}") async def delete_domain( domain: str, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Quellen einer Domain loeschen (nur org-eigene, nicht globale).""" tenant_id = current_user.get("tenant_id") domain_lower = domain.lower().strip() cursor = await db.execute( "SELECT * FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?", (domain_lower, tenant_id), ) rows = await cursor.fetchall() if not rows: raise HTTPException(status_code=404, detail="Keine Quellen fuer diese Domain gefunden") username = current_user["username"] for row in rows: source = dict(row) if source["added_by"] == "system": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Domain enthaelt System-Quellen, die nicht geloescht werden koennen", ) if source["added_by"] and source["added_by"] != username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Domain enthaelt Quellen anderer Nutzer", ) await db.execute( "DELETE FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?", (domain_lower, tenant_id), ) await db.commit() return { "domain": domain_lower, "deleted_count": len(rows), } @router.post("", response_model=SourceResponse, status_code=status.HTTP_201_CREATED) async def create_source( data: SourceCreate, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Neue Quelle hinzufuegen (org-spezifisch).""" tenant_id = current_user.get("tenant_id") # Domain normalisieren (Subdomain-Aliase auflösen, aus URL extrahieren) domain = data.domain if not domain and data.url: domain = _extract_domain(data.url) if domain: domain = _DOMAIN_ALIASES.get(domain.lower(), domain.lower()) # Duplikat-Prüfung 1: gleiche URL bereits vorhanden? (tenant-übergreifend) if data.url: cursor = await db.execute( "SELECT id, name FROM sources WHERE url = ? AND status = 'active'", (data.url,), ) existing = await cursor.fetchone() if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Feed-URL bereits vorhanden: {existing['name']} (ID {existing['id']})", ) # Duplikat-Prüfung 2: Domain bereits vorhanden? (tenant-übergreifend) if domain: cursor = await db.execute( "SELECT id, name, source_type FROM sources WHERE LOWER(domain) = ? AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) LIMIT 1", (domain.lower(), tenant_id), ) domain_existing = await cursor.fetchone() if domain_existing: if data.source_type == "web_source": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Web-Quelle für '{domain}' bereits vorhanden: {domain_existing['name']}", ) if not data.url: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.", ) payload = data.model_dump(exclude_unset=True) cols = ["name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", "added_by", "tenant_id"] vals = [ data.name, data.url, domain, data.source_type, data.category, data.status, data.notes, payload.get("language"), payload.get("bias"), current_user["username"], tenant_id, ] placeholders = ", ".join(["?"] * len(vals)) cursor = await db.execute( f"INSERT INTO sources ({', '.join(cols)}) VALUES ({placeholders})", vals, ) new_id = cursor.lastrowid await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (new_id,)) row = await cursor.fetchone() result = dict(row) result["is_global"] = result.get("tenant_id") is None result["state_affiliated"] = bool(result.get("state_affiliated")) alignments_map = await _load_alignments_for(db, [new_id]) result["alignments"] = alignments_map.get(new_id, []) return result @router.put("/{source_id}", response_model=SourceResponse) async def update_source( source_id: int, data: SourceUpdate, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Quelle bearbeiten.""" cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Quelle nicht gefunden") if row["tenant_id"] is None: raise HTTPException(status_code=403, detail="Grundquellen koennen nur ueber die Verwaltung bearbeitet werden") _check_source_ownership(dict(row), current_user["username"]) payload = data.model_dump(exclude_unset=True) updates = {} for field, value in payload.items(): if field not in SOURCE_UPDATE_COLUMNS: continue if field == "domain" and value: value = _DOMAIN_ALIASES.get(value.lower(), value.lower()) updates[field] = value if updates: set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [source_id] await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() result = dict(row) result["is_global"] = result.get("tenant_id") is None result["state_affiliated"] = bool(result.get("state_affiliated")) alignments_map = await _load_alignments_for(db, [source_id]) result["alignments"] = alignments_map.get(source_id, []) return result @router.delete("/{source_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_source( source_id: int, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Quelle loeschen.""" cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Quelle nicht gefunden") if row["tenant_id"] is None: raise HTTPException(status_code=403, detail="Grundquellen koennen nicht geloescht werden") _check_source_ownership(dict(row), current_user["username"]) await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) await db.commit() @router.post("/telegram/validate") async def validate_telegram_channel( data: dict, current_user: dict = Depends(get_current_user), ): """Prueft ob ein Telegram-Kanal erreichbar ist und gibt Kanalinfo zurueck.""" channel_id = data.get("channel_id", "").strip() if not channel_id: raise HTTPException(status_code=400, detail="channel_id ist erforderlich") try: from feeds.telegram_parser import TelegramParser parser = TelegramParser() result = await parser.validate_channel(channel_id) if result: return result raise HTTPException(status_code=404, detail="Kanal nicht erreichbar oder nicht gefunden") except HTTPException: raise except Exception as e: logger.error("Telegram-Validierung fehlgeschlagen: %s", e, exc_info=True) raise HTTPException(status_code=500, detail="Telegram-Validierung fehlgeschlagen") @router.post("/refresh-counts") async def trigger_refresh_counts( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Artikelzaehler fuer alle Quellen neu berechnen.""" await refresh_source_counts(db) return {"status": "ok"}