diff --git a/scripts/migrate_sources_classification.py b/scripts/migrate_sources_classification.py new file mode 100644 index 0000000..3fab3fe --- /dev/null +++ b/scripts/migrate_sources_classification.py @@ -0,0 +1,64 @@ +"""Einmalige LLM-Klassifikation aller noch unklassifizierten Quellen. + +Verwendung: + python3 scripts/migrate_sources_classification.py --limit 50 + python3 scripts/migrate_sources_classification.py --limit 500 # Alle + python3 scripts/migrate_sources_classification.py --recheck-pending # bereits Pending neu + +Schreibt Vorschlaege in proposed_*-Spalten. Approval erfolgt anschliessend +ueber das Verwaltungs-UI / API (POST /api/sources/{id}/classification/approve). +""" +import argparse +import asyncio +import logging +import sys +from pathlib import Path + +# src/ in PYTHONPATH aufnehmen, wenn Skript direkt aufgerufen wird +HERE = Path(__file__).resolve().parent +SRC = HERE.parent / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from database import get_db # noqa: E402 +from services.source_classifier import bulk_classify # noqa: E402 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) +logger = logging.getLogger("migrate_sources") + + +async def main(): + parser = argparse.ArgumentParser(description="LLM-Klassifikation aller Quellen.") + parser.add_argument("--limit", type=int, default=50, help="Max. Quellen pro Lauf") + parser.add_argument( + "--recheck-pending", + action="store_true", + help="Auch Quellen mit classification_source='llm_pending' neu klassifizieren", + ) + args = parser.parse_args() + + db = await get_db() + try: + result = await bulk_classify( + db, + limit=args.limit, + only_unclassified=not args.recheck_pending, + ) + finally: + await db.close() + + print(f"Verarbeitet: {result['processed']}") + print(f"Erfolgreich: {result['success']}") + print(f"Fehler: {len(result['errors'])}") + print(f"Kosten: ${result['total_cost_usd']:.4f}") + if result["errors"]: + print("\nFehler-Details:") + for e in result["errors"][:10]: + print(f" source_id={e['source_id']}: {e['error']}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/routers/sources.py b/src/routers/sources.py index 9adade2..9907e8d 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,10 +1,12 @@ """Sources-Router: Quellenverwaltung (Multi-Tenant).""" +import json import logging from collections import defaultdict -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from models import SourceCreate, SourceUpdate, SourceResponse, DiscoverRequest, DiscoverResponse, DiscoverMultiResponse, DomainActionRequest from auth import get_current_user -from database import db_dependency, refresh_source_counts +from database import db_dependency, get_db, refresh_source_counts +from services.source_classifier import bulk_classify, classify_source from source_rules import discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, _DOMAIN_ALIASES import aiosqlite @@ -700,3 +702,238 @@ async def trigger_refresh_counts( """Artikelzaehler fuer alle Quellen neu berechnen.""" await refresh_source_counts(db) return {"status": "ok"} + + +# === Klassifikations-Review (LLM-Vorschlaege approve/reject/reclassify) === + +def _require_admin_for_global(row: dict, current_user: dict): + """Globale Quellen (tenant_id IS NULL) duerfen nur org_admins approve-en/reclassify-en.""" + if row.get("tenant_id") is None and current_user.get("role") != "org_admin": + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Globale Quellen koennen nur von Admins klassifiziert werden", + ) + + +@router.get("/classification/stats") +async def classification_stats( + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Counts pro classification_source-Wert (global + eigene Org).""" + tenant_id = current_user.get("tenant_id") + cursor = await db.execute( + """SELECT classification_source, COUNT(*) as cnt + FROM sources + WHERE (tenant_id IS NULL OR tenant_id = ?) AND status = 'active' + GROUP BY classification_source""", + (tenant_id,), + ) + by_source = {row["classification_source"] or "legacy": row["cnt"] for row in await cursor.fetchall()} + cursor = await db.execute( + """SELECT COUNT(*) as cnt FROM sources + WHERE (tenant_id IS NULL OR tenant_id = ?) AND status = 'active' + AND proposed_political_orientation IS NOT NULL""", + (tenant_id,), + ) + pending = (await cursor.fetchone())["cnt"] + return { + "by_classification_source": by_source, + "pending_review": pending, + "total": sum(by_source.values()), + } + + +@router.get("/classification/queue") +async def classification_queue( + limit: int = 50, + min_confidence: float = 0.0, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Liefert Quellen mit nicht-leeren proposed_*-Spalten (Review-Queue).""" + tenant_id = current_user.get("tenant_id") + cursor = await db.execute( + """SELECT s.* FROM sources s + WHERE (s.tenant_id IS NULL OR s.tenant_id = ?) + AND s.proposed_political_orientation IS NOT NULL + AND COALESCE(s.proposed_confidence, 0) >= ? + ORDER BY s.proposed_confidence DESC, s.proposed_at DESC + LIMIT ?""", + (tenant_id, min_confidence, limit), + ) + rows = [dict(r) for r in await cursor.fetchall()] + alignments_map = await _load_alignments_for(db, [r["id"] for r in rows]) + out = [] + for d in rows: + try: + proposed_aligns = json.loads(d.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + out.append({ + "id": d["id"], + "name": d["name"], + "url": d.get("url"), + "domain": d.get("domain"), + "source_type": d.get("source_type"), + "category": d.get("category"), + "is_global": d.get("tenant_id") is None, + "current": { + "political_orientation": d.get("political_orientation"), + "media_type": d.get("media_type"), + "reliability": d.get("reliability"), + "state_affiliated": bool(d.get("state_affiliated")), + "country_code": d.get("country_code"), + "alignments": alignments_map.get(d["id"], []), + "classification_source": d.get("classification_source"), + }, + "proposed": { + "political_orientation": d.get("proposed_political_orientation"), + "media_type": d.get("proposed_media_type"), + "reliability": d.get("proposed_reliability"), + "state_affiliated": bool(d.get("proposed_state_affiliated")), + "country_code": d.get("proposed_country_code"), + "alignments": proposed_aligns, + "confidence": d.get("proposed_confidence"), + "reasoning": d.get("proposed_reasoning"), + "proposed_at": d.get("proposed_at"), + }, + }) + return out + + +async def _clear_proposed(db: aiosqlite.Connection, source_id: int): + """Loescht die proposed_*-Felder einer Quelle (ohne commit).""" + await db.execute( + """UPDATE sources SET + proposed_political_orientation = NULL, + proposed_media_type = NULL, + proposed_reliability = NULL, + proposed_state_affiliated = NULL, + proposed_country_code = NULL, + proposed_alignments_json = NULL, + proposed_confidence = NULL, + proposed_reasoning = NULL, + proposed_at = NULL + WHERE id = ?""", + (source_id,), + ) + + +@router.post("/{source_id}/classification/approve") +async def approve_classification( + source_id: int, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Uebernimmt proposed_* in echte Felder, setzt classification_source='llm_approved'.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + _require_admin_for_global(src, current_user) + + if src.get("proposed_political_orientation") is None: + raise HTTPException(status_code=400, detail="Keine LLM-Vorschlaege fuer diese Quelle vorhanden") + + try: + proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + + await db.execute( + """UPDATE sources SET + political_orientation = ?, + media_type = ?, + reliability = ?, + state_affiliated = ?, + country_code = ?, + classification_source = 'llm_approved', + classified_at = CURRENT_TIMESTAMP + WHERE id = ?""", + ( + src["proposed_political_orientation"], + src["proposed_media_type"], + src["proposed_reliability"], + 1 if src.get("proposed_state_affiliated") else 0, + src.get("proposed_country_code"), + source_id, + ), + ) + await _replace_alignments(db, source_id, [a for a in proposed_aligns if a in ALLOWED_ALIGNMENTS]) + await _clear_proposed(db, source_id) + await db.commit() + return {"source_id": source_id, "status": "approved"} + + +@router.post("/{source_id}/classification/reject") +async def reject_classification( + source_id: int, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Verwirft die LLM-Vorschlaege ohne Uebernahme. classification_source bleibt unveraendert.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + _require_admin_for_global(src, current_user) + + await _clear_proposed(db, source_id) + # Wenn classification_source noch 'llm_pending' war, zurueck auf 'legacy' + if src.get("classification_source") == "llm_pending": + await db.execute( + "UPDATE sources SET classification_source = 'legacy' WHERE id = ?", + (source_id,), + ) + await db.commit() + return {"source_id": source_id, "status": "rejected"} + + +@router.post("/{source_id}/classification/reclassify") +async def reclassify_source( + source_id: int, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Triggert eine LLM-Klassifikation einer einzelnen Quelle (synchron, ~3-5s).""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + _require_admin_for_global(src, current_user) + + try: + result = await classify_source(db, source_id) + except Exception as e: + logger.error("Reclassify source_id=%s fehlgeschlagen: %s", source_id, e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Klassifikation fehlgeschlagen: {e}") + return result + + +async def _bulk_classify_background(limit: int, only_unclassified: bool): + """Hintergrund-Task: oeffnet eigene DB-Connection.""" + db = await get_db() + try: + await bulk_classify(db, limit=limit, only_unclassified=only_unclassified) + finally: + await db.close() + + +@router.post("/classification/bulk-classify") +async def trigger_bulk_classify( + background_tasks: BackgroundTasks, + limit: int = 50, + only_unclassified: bool = True, + current_user: dict = Depends(get_current_user), +): + """Startet eine Bulk-Klassifikation im Hintergrund (nur Admins).""" + if current_user.get("role") != "org_admin": + raise HTTPException(status_code=403, detail="Nur Admins koennen Bulk-Klassifikation starten") + if limit < 1 or limit > 500: + raise HTTPException(status_code=400, detail="limit muss zwischen 1 und 500 liegen") + background_tasks.add_task(_bulk_classify_background, limit, only_unclassified) + return {"status": "started", "limit": limit, "only_unclassified": only_unclassified} diff --git a/src/services/source_classifier.py b/src/services/source_classifier.py new file mode 100644 index 0000000..c965958 --- /dev/null +++ b/src/services/source_classifier.py @@ -0,0 +1,295 @@ +"""Klassifiziert Quellen via Claude (Haiku) nach 4 Achsen + state_affiliated + country. + +Schreibt Vorschlaege in die proposed_*-Spalten von sources und setzt +classification_source='llm_pending'. Approval erfolgt ueber separate Endpoints, +die proposed_* in die echten Spalten kopieren. +""" +import asyncio +import json +import logging +import re + +import aiosqlite + +from agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_classifier") + +POLITICAL_VALUES = { + "links_extrem", "links", "mitte_links", "liberal", "mitte", + "konservativ", "mitte_rechts", "rechts", "rechts_extrem", "na", +} +MEDIA_TYPE_VALUES = { + "tageszeitung", "wochenzeitung", "magazin", "tv_sender", "radio", + "oeffentlich_rechtlich", "nachrichtenagentur", "online_only", "blog", + "telegram_kanal", "telegram_bot", "podcast", "social_media", "imageboard", + "think_tank", "ngo", "behoerde", "staatsmedium", "fachmedium", "sonstige", +} +RELIABILITY_VALUES = {"sehr_hoch", "hoch", "gemischt", "niedrig", "sehr_niedrig", "na"} +ALIGNMENT_VALUES = { + "prorussisch", "proiranisch", "prowestlich", "proukrainisch", + "prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch", + "protuerkisch", "panarabisch", "neutral", "sonstige", +} + + +def _build_prompt(src: dict, sample_articles: list[dict]) -> str: + sample_text = "" + if sample_articles: + lines = [] + for i, art in enumerate(sample_articles[:5], 1): + headline = (art.get("headline") or art.get("headline_de") or "").strip() + if headline: + lines.append(f"{i}. {headline[:200]}") + if lines: + sample_text = "\nLetzte Artikel/Headlines:\n" + "\n".join(lines) + + return f"""Du bist ein OSINT-Analyst und klassifizierst Nachrichten- und Medienquellen fuer ein Lagebild-Monitoring-System (DACH-Raum). + +QUELLE: +Name: {src.get('name')} +URL: {src.get('url') or '-'} +Domain: {src.get('domain') or '-'} +Quellentyp: {src.get('source_type')} +Bisherige Kategorie: {src.get('category')} +Sprache: {src.get('language') or 'unbekannt'} +Bisherige Notiz (Freitext): {src.get('bias') or '-'}{sample_text} + +AUFGABE: Klassifiziere die Quelle nach folgenden Achsen. + +1. political_orientation: + - links_extrem (z.B. linksunten.indymedia) + - links (klar links, z.B. junge Welt, taz) + - mitte_links (linksliberal/sozialdemokratisch, z.B. SZ, Spiegel) + - liberal (wirtschafts-/grünliberal, z.B. NZZ, Zeit) + - mitte (politisch neutral, Agentur, z.B. dpa, Reuters, tagesschau) + - konservativ (buergerlich-konservativ, z.B. FAZ, Welt) + - mitte_rechts (rechts-buergerlich, z.B. Tichys Einblick, Achgut) + - rechts (klar rechts, z.B. Junge Freiheit, EpochTimes) + - rechts_extrem (z.B. Compact, PI-News) + - na (nicht klassifizierbar: Behoerde, Fachmedium, Think Tank ohne klare politische Linie) + +2. media_type (genau einer): + tageszeitung, wochenzeitung, magazin, tv_sender, radio, oeffentlich_rechtlich, + nachrichtenagentur, online_only, blog, telegram_kanal, telegram_bot, podcast, + social_media, imageboard, think_tank, ngo, behoerde, staatsmedium, fachmedium, sonstige + +3. reliability: + - sehr_hoch (etablierte Qualitaet, Faktencheck: tagesschau, dpa, FAZ, Reuters) + - hoch (serioes mit gelegentlichen Schwaechen: taz, Welt, BILD bei harten News) + - gemischt (Mix Meinung/Einseitigkeit: Tichys Einblick, Achgut, Boulevard) + - niedrig (haeufig irrefuehrend, schwache Quellenarbeit: Junge Freiheit, EpochTimes) + - sehr_niedrig (bekannt fuer Desinformation/Verschwoerung: Compact, RT, Sputnik, PI-News) + - na (nicht bewertbar) + +4. alignments (Mehrfach, leeres Array wenn keine ausgepraegte Naehe): + prorussisch, proiranisch, prowestlich, proukrainisch, prochinesisch, projapanisch, + proisraelisch, propalaestinensisch, protuerkisch, panarabisch, neutral, sonstige + +5. state_affiliated (true/false): true wenn vom Staat finanziert/kontrolliert + (RT, Sputnik, CGTN, PressTV, Xinhua, TRT). Public Service Broadcaster + wie ARD/ZDF/BBC sind NICHT state_affiliated. + +6. country_code (ISO 3166-1 alpha-2): Heimatland (DE, AT, CH, RU, US, ...). null wenn unklar. + +7. confidence (0.0-1.0): 0.85+ fuer bekannte Outlets, 0.5-0.85 fuer mittelbekannt, <0.5 fuer unsicher. + +8. reasoning (1-2 Saetze): Kurze Begruendung der Hauptklassifikationen. + +WICHTIG: +- Antworte AUSSCHLIESSLICH mit einem JSON-Objekt, kein Text drumherum. +- Nutze ausschliesslich die genannten enum-Werte (snake_case). +- Bei Unklarheit lieber `na` und niedrige confidence. + +JSON-Schema: +{{ + "political_orientation": "...", + "media_type": "...", + "reliability": "...", + "alignments": ["..."], + "state_affiliated": false, + "country_code": "DE", + "confidence": 0.9, + "reasoning": "..." +}}""" + + +async def _load_sample_articles(db: aiosqlite.Connection, name: str, domain: str | None, limit: int = 5) -> list[dict]: + """Laedt die letzten Headlines einer Quelle (per name oder Domain-Match).""" + rows: list = [] + if name: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source = ? ORDER BY collected_at DESC LIMIT ?", + (name, limit), + ) + rows = await cursor.fetchall() + if not rows and domain: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source_url LIKE ? ORDER BY collected_at DESC LIMIT ?", + (f"%{domain}%", limit), + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +def _validate(parsed: dict) -> dict: + """Validiert + normalisiert eine LLM-Antwort gegen die Enums.""" + pol = parsed.get("political_orientation", "na") + if pol not in POLITICAL_VALUES: + pol = "na" + mt = parsed.get("media_type", "sonstige") + if mt not in MEDIA_TYPE_VALUES: + mt = "sonstige" + rel = parsed.get("reliability", "na") + if rel not in RELIABILITY_VALUES: + rel = "na" + aligns_raw = parsed.get("alignments") or [] + if not isinstance(aligns_raw, list): + aligns_raw = [] + aligns = sorted({a for a in aligns_raw if isinstance(a, str) and a in ALIGNMENT_VALUES}) + sa = bool(parsed.get("state_affiliated", False)) + cc = parsed.get("country_code") + if isinstance(cc, str) and len(cc) == 2 and cc.isalpha(): + cc = cc.upper() + else: + cc = None + try: + confidence = float(parsed.get("confidence", 0.5)) + confidence = max(0.0, min(1.0, confidence)) + except (TypeError, ValueError): + confidence = 0.5 + reasoning = str(parsed.get("reasoning", ""))[:1000] + return { + "political_orientation": pol, + "media_type": mt, + "reliability": rel, + "alignments": aligns, + "state_affiliated": sa, + "country_code": cc, + "confidence": confidence, + "reasoning": reasoning, + } + + +async def classify_source( + db: aiosqlite.Connection, + source_id: int, + sample_limit: int = 5, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert eine einzelne Quelle und schreibt die Vorschlaege in proposed_*-Spalten.""" + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, language, bias, " + "classification_source FROM sources WHERE id = ?", + (source_id,), + ) + row = await cursor.fetchone() + if not row: + raise ValueError(f"Quelle {source_id} nicht gefunden") + src = dict(row) + + sample = await _load_sample_articles(db, src["name"], src.get("domain"), sample_limit) + prompt = _build_prompt(src, sample) + response, usage = await call_claude(prompt, tools=None, model=model) + + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if not json_match: + raise ValueError(f"Keine JSON-Antwort von Claude fuer source_id={source_id}: {response[:200]}") + parsed = json.loads(json_match.group(0)) + result = _validate(parsed) + + # Nur classification_source auf 'llm_pending' setzen, wenn nicht bereits manuell/approved + new_src = "CASE WHEN classification_source IN ('manual','llm_approved') THEN classification_source ELSE 'llm_pending' END" + await db.execute( + f"""UPDATE sources SET + proposed_political_orientation = ?, + proposed_media_type = ?, + proposed_reliability = ?, + proposed_state_affiliated = ?, + proposed_country_code = ?, + proposed_alignments_json = ?, + proposed_confidence = ?, + proposed_reasoning = ?, + proposed_at = CURRENT_TIMESTAMP, + classification_source = {new_src} + WHERE id = ?""", + ( + result["political_orientation"], + result["media_type"], + result["reliability"], + 1 if result["state_affiliated"] else 0, + result["country_code"], + json.dumps(result["alignments"], ensure_ascii=False), + result["confidence"], + result["reasoning"], + source_id, + ), + ) + await db.commit() + + logger.info( + "Klassifiziert source_id=%s '%s' -> %s/%s/%s conf=%.2f ($%.4f)", + source_id, src["name"], result["political_orientation"], + result["media_type"], result["reliability"], result["confidence"], + usage.cost_usd, + ) + + result["source_id"] = source_id + result["usage"] = { + "cost_usd": usage.cost_usd, + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } + return result + + +async def bulk_classify( + db: aiosqlite.Connection, + limit: int = 50, + only_unclassified: bool = True, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert noch unklassifizierte Quellen (sequenziell). + + Args: + limit: Maximale Anzahl Quellen pro Aufruf + only_unclassified: Wenn True, nur classification_source='legacy'. + Wenn False, auch 'llm_pending' neu klassifizieren. + """ + if only_unclassified: + where = "classification_source = 'legacy'" + else: + where = "classification_source IN ('legacy', 'llm_pending')" + cursor = await db.execute( + f"SELECT id FROM sources WHERE {where} AND status = 'active' " + f"AND source_type != 'excluded' ORDER BY id LIMIT ?", + (limit,), + ) + ids = [row["id"] for row in await cursor.fetchall()] + + total_cost = 0.0 + success = 0 + errors: list[dict] = [] + + for sid in ids: + try: + r = await classify_source(db, sid, model=model) + total_cost += r["usage"]["cost_usd"] + success += 1 + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("Klassifikation source_id=%s fehlgeschlagen: %s", sid, e, exc_info=True) + errors.append({"source_id": sid, "error": str(e)}) + + logger.info( + "Bulk-Klassifikation fertig: %d/%d erfolgreich, $%.4f Kosten, %d Fehler", + success, len(ids), total_cost, len(errors), + ) + return { + "processed": len(ids), + "success": success, + "errors": errors, + "total_cost_usd": total_cost, + }