"""Sources-Router: Quellenverwaltung (Multi-Tenant).""" import logging from collections import defaultdict from fastapi import APIRouter, Depends, HTTPException, status from models import SourceCreate, SourceUpdate, SourceResponse, DiscoverRequest, DiscoverResponse, DiscoverMultiResponse, DomainActionRequest from auth import get_current_user from database import db_dependency, refresh_source_counts from source_rules import discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, _DOMAIN_ALIASES import aiosqlite logger = logging.getLogger("osint.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"} def _check_source_ownership(source: dict, username: str): """Prueft ob der Nutzer die Quelle bearbeiten/loeschen darf. System-Quellen (auto-entdeckt) duerfen von jedem bearbeitet werden. Nutzer-Quellen nur vom Ersteller. """ added_by = source.get("added_by", "") if added_by and added_by != "system" and added_by != username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Nur der Ersteller kann diese Quelle bearbeiten", ) @router.get("", response_model=list[SourceResponse]) async def list_sources( source_type: str = None, category: str = None, source_status: str = None, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Quellen auflisten (global + org-spezifisch).""" tenant_id = current_user.get("tenant_id") # Global (tenant_id=NULL) + eigene Org query = "SELECT * FROM sources WHERE (tenant_id IS NULL OR tenant_id = ?)" params = [tenant_id] if source_type: query += " AND source_type = ?" params.append(source_type) if category: query += " AND category = ?" params.append(category) if source_status: query += " AND status = ?" params.append(source_status) query += " ORDER BY source_type, category, name" cursor = await db.execute(query, params) rows = await cursor.fetchall() results = [] for row in rows: d = dict(row) d["is_global"] = d.get("tenant_id") is None results.append(d) return results @router.get("/stats") async def get_source_stats( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Aggregierte Quellen-Statistiken (global + eigene Org).""" tenant_id = current_user.get("tenant_id") cursor = await db.execute(""" SELECT source_type, COUNT(*) as count, SUM(article_count) as total_articles FROM sources WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) GROUP BY source_type """, (tenant_id,)) rows = await cursor.fetchall() stats = { "rss_feed": {"count": 0, "articles": 0}, "web_source": {"count": 0, "articles": 0}, "telegram_channel": {"count": 0, "articles": 0}, "excluded": {"count": 0, "articles": 0}, } for row in rows: st = row["source_type"] if st in stats: stats[st]["count"] = row["count"] stats[st]["articles"] = row["total_articles"] or 0 cursor = await db.execute( "SELECT COUNT(*) as cnt FROM articles WHERE tenant_id = ?", (tenant_id,), ) total_row = await cursor.fetchone() return { "by_type": stats, "total_sources": sum(s["count"] for s in stats.values()), "total_articles": total_row["cnt"], } @router.post("/discover", response_model=DiscoverResponse) async def discover_source_endpoint( data: DiscoverRequest, current_user: dict = Depends(get_current_user), ): """RSS-Feed, Name, Kategorie und Domain einer URL automatisch erkennen.""" try: result = await discover_source(data.url) return result except Exception as e: logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen") @router.post("/discover-multi", response_model=DiscoverMultiResponse) async def discover_multi_endpoint( data: DiscoverRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Findet ALLE RSS-Feeds einer Domain, bewertet sie mit Claude und legt relevante als Quellen an.""" tenant_id = current_user.get("tenant_id") try: multi = await discover_all_feeds(data.url) domain = multi["domain"] category = multi["category"] if not multi["feeds"]: single = await discover_source(data.url) sources = [] if single.get("rss_url"): cursor = await db.execute( "SELECT id FROM sources WHERE url = ?", (single["rss_url"],) ) existing = await cursor.fetchone() if not existing: cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, ?, ?, 'active', ?, ?)""", (single["name"], single["rss_url"], single["domain"], single["source_type"], single["category"], current_user["username"], tenant_id), ) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) row = await cursor.fetchone() sources.append(dict(row)) return DiscoverMultiResponse( domain=single.get("domain", domain), category=single.get("category", category), added_count=len(sources), skipped_count=1 if not sources and single.get("rss_url") else 0, total_found=1 if single.get("rss_url") else 0, sources=sources, fallback_single=True, ) relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"]) cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL") existing_urls = {row["url"] for row in await cursor.fetchall()} new_ids = [] skipped = 0 for feed in relevant_feeds: if feed["url"] in existing_urls: skipped += 1 continue cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""", (feed["name"], feed["url"], domain, category, current_user["username"], tenant_id), ) new_ids.append(cursor.lastrowid) existing_urls.add(feed["url"]) cursor = await db.execute( "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source'", (domain.lower(),), ) if not await cursor.fetchone(): cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'web_source', ?, 'active', ?, ?)""", (domain_to_display_name(domain), f"https://{domain}", domain, category, current_user["username"], tenant_id), ) new_ids.append(cursor.lastrowid) await db.commit() added_sources = [] if new_ids: placeholders = ",".join("?" for _ in new_ids) cursor = await db.execute( f"SELECT * FROM sources WHERE id IN ({placeholders}) ORDER BY id", new_ids, ) added_sources = [dict(row) for row in await cursor.fetchall()] return DiscoverMultiResponse( domain=domain, category=category, added_count=len(added_sources), skipped_count=skipped, total_found=len(multi["feeds"]), sources=added_sources, fallback_single=False, ) except Exception as e: logger.error(f"Multi-Discovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Multi-Discovery fehlgeschlagen") @router.post("/rediscover-existing") async def rediscover_existing_endpoint( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Einmalige Migration: Bestehende RSS-Quellen nach zusaetzlichen Feeds durchsuchen.""" tenant_id = current_user.get("tenant_id") try: cursor = await db.execute( "SELECT * FROM sources WHERE source_type = 'rss_feed' AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)", (tenant_id,), ) existing_sources = [dict(row) for row in await cursor.fetchall()] domains = defaultdict(list) for src in existing_sources: if src["domain"]: domains[src["domain"]].append(src) cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL") existing_urls = {row["url"] for row in await cursor.fetchall()} domains_processed = 0 feeds_added = 0 feeds_skipped = 0 for domain, sources in domains.items(): domains_processed += 1 base_url = f"https://{domain}" try: multi = await discover_all_feeds(base_url) if not multi["feeds"]: continue relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"]) category = _detect_category(domain) for feed in relevant_feeds: if feed["url"] in existing_urls: feeds_skipped += 1 continue await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""", (feed["name"], feed["url"], domain, category, current_user["username"], tenant_id), ) existing_urls.add(feed["url"]) feeds_added += 1 await db.commit() except Exception as e: logger.warning(f"Rediscovery fuer {domain} fehlgeschlagen: {e}") continue return { "domains_processed": domains_processed, "feeds_added": feeds_added, "feeds_skipped": feeds_skipped, } except Exception as e: logger.error(f"Rediscovery fehlgeschlagen: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Rediscovery fehlgeschlagen") @router.get("/my-exclusions") async def get_my_exclusions( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Gibt die vom aktuellen User ausgeschlossenen Domains zurück.""" user_id = current_user["id"] cursor = await db.execute( "SELECT domain, notes, created_at FROM user_excluded_domains WHERE user_id = ? ORDER BY domain", (user_id,), ) rows = await cursor.fetchall() return [dict(row) for row in rows] @router.post("/block-domain") async def block_domain( data: DomainActionRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Domain fuer den aktuellen User ausschließen (per-User, nicht org-weit).""" user_id = current_user["id"] domain = data.domain.lower().strip() # Pruefen ob bereits ausgeschlossen cursor = await db.execute( "SELECT id FROM user_excluded_domains WHERE user_id = ? AND domain = ?", (user_id, domain), ) existing = await cursor.fetchone() if existing: if data.notes: await db.execute( "UPDATE user_excluded_domains SET notes = ? WHERE id = ?", (data.notes, existing["id"]), ) await db.commit() return {"domain": domain, "status": "already_excluded"} await db.execute( "INSERT INTO user_excluded_domains (user_id, domain, notes) VALUES (?, ?, ?)", (user_id, domain, data.notes), ) await db.commit() return {"domain": domain, "status": "excluded"} @router.post("/unblock-domain") async def unblock_domain( data: DomainActionRequest, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Domain-Ausschluss fuer den aktuellen User aufheben.""" user_id = current_user["id"] domain = data.domain.lower().strip() cursor = await db.execute( "DELETE FROM user_excluded_domains WHERE user_id = ? AND domain = ?", (user_id, domain), ) removed = cursor.rowcount await db.commit() return {"domain": domain, "removed": removed > 0} @router.delete("/domain/{domain}") async def delete_domain( domain: str, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Alle Quellen einer Domain loeschen (nur org-eigene, nicht globale).""" tenant_id = current_user.get("tenant_id") domain_lower = domain.lower().strip() cursor = await db.execute( "SELECT * FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?", (domain_lower, tenant_id), ) rows = await cursor.fetchall() if not rows: raise HTTPException(status_code=404, detail="Keine Quellen fuer diese Domain gefunden") username = current_user["username"] for row in rows: source = dict(row) if source["added_by"] == "system": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Domain enthaelt System-Quellen, die nicht geloescht werden koennen", ) if source["added_by"] and source["added_by"] != username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Domain enthaelt Quellen anderer Nutzer", ) await db.execute( "DELETE FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?", (domain_lower, tenant_id), ) await db.commit() return { "domain": domain_lower, "deleted_count": len(rows), } @router.post("", response_model=SourceResponse, status_code=status.HTTP_201_CREATED) async def create_source( data: SourceCreate, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Neue Quelle hinzufuegen (org-spezifisch).""" tenant_id = current_user.get("tenant_id") # Domain normalisieren (Subdomain-Aliase auflösen, aus URL extrahieren) domain = data.domain if not domain and data.url: domain = _extract_domain(data.url) if domain: domain = _DOMAIN_ALIASES.get(domain.lower(), domain.lower()) # Duplikat-Prüfung 1: gleiche URL bereits vorhanden? (tenant-übergreifend) if data.url: cursor = await db.execute( "SELECT id, name FROM sources WHERE url = ? AND status = 'active'", (data.url,), ) existing = await cursor.fetchone() if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Feed-URL bereits vorhanden: {existing['name']} (ID {existing['id']})", ) # Duplikat-Prüfung 2: Domain bereits vorhanden? (tenant-übergreifend) if domain: cursor = await db.execute( "SELECT id, name, source_type FROM sources WHERE LOWER(domain) = ? AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) LIMIT 1", (domain.lower(), tenant_id), ) domain_existing = await cursor.fetchone() if domain_existing: if data.source_type == "web_source": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Web-Quelle für '{domain}' bereits vorhanden: {domain_existing['name']}", ) if not data.url: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.", ) cursor = await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( data.name, data.url, domain, data.source_type, data.category, data.status, data.notes, current_user["username"], tenant_id, ), ) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) row = await cursor.fetchone() return dict(row) @router.put("/{source_id}", response_model=SourceResponse) async def update_source( source_id: int, data: SourceUpdate, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Quelle bearbeiten.""" cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Quelle nicht gefunden") if row["tenant_id"] is None: raise HTTPException(status_code=403, detail="Grundquellen koennen nur ueber die Verwaltung bearbeitet werden") _check_source_ownership(dict(row), current_user["username"]) updates = {} for field, value in data.model_dump(exclude_none=True).items(): if field not in SOURCE_UPDATE_COLUMNS: continue # Domain normalisieren if field == "domain" and value: value = _DOMAIN_ALIASES.get(value.lower(), value.lower()) updates[field] = value if not updates: return dict(row) set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [source_id] await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() return dict(row) @router.delete("/{source_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_source( source_id: int, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Quelle loeschen.""" cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Quelle nicht gefunden") if row["tenant_id"] is None: raise HTTPException(status_code=403, detail="Grundquellen koennen nicht geloescht werden") _check_source_ownership(dict(row), current_user["username"]) await db.execute("DELETE FROM sources WHERE id = ?", (source_id,)) await db.commit() @router.post("/telegram/validate") async def validate_telegram_channel( data: dict, current_user: dict = Depends(get_current_user), ): """Prueft ob ein Telegram-Kanal erreichbar ist und gibt Kanalinfo zurueck.""" channel_id = data.get("channel_id", "").strip() if not channel_id: raise HTTPException(status_code=400, detail="channel_id ist erforderlich") try: from feeds.telegram_parser import TelegramParser parser = TelegramParser() result = await parser.validate_channel(channel_id) if result: return result raise HTTPException(status_code=404, detail="Kanal nicht erreichbar oder nicht gefunden") except HTTPException: raise except Exception as e: logger.error("Telegram-Validierung fehlgeschlagen: %s", e, exc_info=True) raise HTTPException(status_code=500, detail="Telegram-Validierung fehlgeschlagen") @router.post("/refresh-counts") async def trigger_refresh_counts( current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): """Artikelzaehler fuer alle Quellen neu berechnen.""" await refresh_source_counts(db) return {"status": "ok"}