diff --git a/src/routers/sources.py b/src/routers/sources.py index 01c447e..b238423 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,15 +1,16 @@ """Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" +import json import logging import uuid -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import Optional import aiosqlite from auth import get_current_admin -from database import db_dependency +from database import db_dependency, get_db from audit import log_action, get_client_ip from source_meta import get_meta from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S @@ -19,12 +20,84 @@ from shared.source_rules import ( evaluate_feeds_with_claude, domain_to_display_name, ) +from shared.services.source_classifier import ( + bulk_classify, + classify_source, + ALIGNMENT_VALUES, + POLITICAL_VALUES, + MEDIA_TYPE_VALUES, + RELIABILITY_VALUES, +) +from shared.services.external_reputation import ( + apply_reputation_overrides, + sync_all as sync_external_reputation, +) logger = logging.getLogger("verwaltung.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) -SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", "fetch_strategy"} +SOURCE_UPDATE_COLUMNS = { + "name", "url", "domain", "source_type", "category", "status", "notes", + "language", "bias", "fetch_strategy", + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} +SOURCE_CLASSIFICATION_FIELDS = { + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} + + +async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]: + if not source_ids: + return {} + placeholders = ",".join("?" for _ in source_ids) + cursor = await db.execute( + f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment", + source_ids, + ) + out: dict[int, list[str]] = {sid: [] for sid in source_ids} + for row in await cursor.fetchall(): + out.setdefault(row["source_id"], []).append(row["alignment"]) + return out + + +async def _replace_alignments(db: aiosqlite.Connection, source_id: int, alignments: list[str]): + """Ersetzt die alignments-Liste einer Quelle (DELETE + INSERT) — Aufrufer muss commit() machen.""" + await db.execute("DELETE FROM source_alignments WHERE source_id = ?", (source_id,)) + seen: set[str] = set() + for raw in alignments: + a = (raw or "").strip().lower() + if not a or a in seen: + continue + if a not in ALIGNMENT_VALUES: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Ungueltiger alignment-Wert: '{a}'", + ) + seen.add(a) + await db.execute( + "INSERT INTO source_alignments (source_id, alignment) VALUES (?, ?)", + (source_id, a), + ) + + +async def _clear_proposed(db: aiosqlite.Connection, source_id: int): + await db.execute( + """UPDATE sources SET + proposed_political_orientation = NULL, + proposed_media_type = NULL, + proposed_reliability = NULL, + proposed_state_affiliated = NULL, + proposed_country_code = NULL, + proposed_alignments_json = NULL, + proposed_confidence = NULL, + proposed_reasoning = NULL, + proposed_at = NULL + WHERE id = ?""", + (source_id,), + ) @router.get("/meta") @@ -61,6 +134,12 @@ class GlobalSourceUpdate(BaseModel): notes: Optional[str] = None language: Optional[str] = Field(default=None, max_length=100) bias: Optional[str] = Field(default=None, max_length=500) + political_orientation: Optional[str] = None + media_type: Optional[str] = None + reliability: Optional[str] = None + state_affiliated: Optional[bool] = None + country_code: Optional[str] = Field(default=None, max_length=8) + alignments: Optional[list[str]] = None @router.get("/global") @@ -120,7 +199,11 @@ async def list_global_sources( WHERE s.tenant_id IS NULL ORDER BY s.category, s.source_type, s.name """) - return [dict(row) for row in await cursor.fetchall()] + rows = [dict(row) for row in await cursor.fetchall()] + alignments_map = await _load_alignments_for(db, [r["id"] for r in rows]) + for r in rows: + r["alignments"] = alignments_map.get(r["id"], []) + return rows @router.post("/global", status_code=201) @@ -170,7 +253,7 @@ async def update_global_source( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): - """Grundquelle bearbeiten.""" + """Grundquelle bearbeiten — inkl. Klassifikation + alignments.""" cursor = await db.execute( "SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) ) @@ -178,21 +261,50 @@ async def update_global_source( if not row: raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") before = dict(row) + before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) - updates = {} - for field, value in data.model_dump(exclude_none=True).items(): - updates[field] = value + payload = data.model_dump(exclude_none=True) + alignments = payload.pop("alignments", None) - if not updates: - return before + if "political_orientation" in payload and payload["political_orientation"] not in POLITICAL_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltige political_orientation: {payload['political_orientation']}") + if "media_type" in payload and payload["media_type"] not in MEDIA_TYPE_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltiger media_type: {payload['media_type']}") + if "reliability" in payload and payload["reliability"] not in RELIABILITY_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltige reliability: {payload['reliability']}") + + updates = {k: v for k, v in payload.items() if k in SOURCE_UPDATE_COLUMNS} + if "state_affiliated" in updates: + updates["state_affiliated"] = 1 if updates["state_affiliated"] else 0 + + classification_touched = any(k in updates for k in SOURCE_CLASSIFICATION_FIELDS) or alignments is not None + if classification_touched: + updates["classification_source"] = "manual" + updates["classified_at"] = None # CURRENT_TIMESTAMP via SQL — siehe unten + + if updates: + sets = [] + vals = [] + for k, v in updates.items(): + if k == "classified_at": + sets.append("classified_at = CURRENT_TIMESTAMP") + else: + sets.append(f"{k} = ?") + vals.append(v) + vals.append(source_id) + await db.execute(f"UPDATE sources SET {', '.join(sets)} WHERE id = ?", vals) + + if alignments is not None: + await _replace_alignments(db, source_id, alignments) - set_clause = ", ".join(f"{k} = ?" for k in updates) - values = list(updates.values()) + [source_id] - await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) after = dict(await cursor.fetchone()) + after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + if before_alignments != after_alignments: + before["alignments"] = before_alignments + after["alignments"] = after_alignments await log_action( db, admin, get_client_ip(request), action="update", resource_type="source", resource_id=source_id, @@ -1086,3 +1198,307 @@ Nur das JSON, kein anderer Text.""" except Exception as e: raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}") + + +# === Klassifikations-Review (LLM-Vorschlaege approve/reject/reclassify) === + + +@router.get("/classification/stats") +async def classification_stats( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Counts pro classification_source-Wert + Anzahl Pending-Reviews (alle Quellen).""" + cursor = await db.execute( + """SELECT classification_source, COUNT(*) as cnt + FROM sources + WHERE status = 'active' + GROUP BY classification_source""" + ) + by_source = {row["classification_source"] or "legacy": row["cnt"] for row in await cursor.fetchall()} + cursor = await db.execute( + """SELECT COUNT(*) as cnt FROM sources + WHERE status = 'active' AND proposed_political_orientation IS NOT NULL""" + ) + pending = (await cursor.fetchone())["cnt"] + return { + "by_classification_source": by_source, + "pending_review": pending, + "total": sum(by_source.values()), + } + + +@router.get("/classification/queue") +async def classification_queue( + limit: int = 50, + min_confidence: float = 0.0, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Liefert Quellen mit nicht-leeren proposed_*-Spalten (Review-Queue).""" + cursor = await db.execute( + """SELECT s.* FROM sources s + WHERE s.proposed_political_orientation IS NOT NULL + AND COALESCE(s.proposed_confidence, 0) >= ? + ORDER BY s.proposed_confidence DESC, s.proposed_at DESC + LIMIT ?""", + (min_confidence, limit), + ) + rows = [dict(r) for r in await cursor.fetchall()] + alignments_map = await _load_alignments_for(db, [r["id"] for r in rows]) + out = [] + for d in rows: + try: + proposed_aligns = json.loads(d.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + out.append({ + "id": d["id"], + "name": d["name"], + "url": d.get("url"), + "domain": d.get("domain"), + "source_type": d.get("source_type"), + "category": d.get("category"), + "is_global": d.get("tenant_id") is None, + "current": { + "political_orientation": d.get("political_orientation"), + "media_type": d.get("media_type"), + "reliability": d.get("reliability"), + "state_affiliated": bool(d.get("state_affiliated")), + "country_code": d.get("country_code"), + "alignments": alignments_map.get(d["id"], []), + "classification_source": d.get("classification_source"), + }, + "proposed": { + "political_orientation": d.get("proposed_political_orientation"), + "media_type": d.get("proposed_media_type"), + "reliability": d.get("proposed_reliability"), + "state_affiliated": bool(d.get("proposed_state_affiliated")), + "country_code": d.get("proposed_country_code"), + "alignments": proposed_aligns, + "confidence": d.get("proposed_confidence"), + "reasoning": d.get("proposed_reasoning"), + "proposed_at": d.get("proposed_at"), + }, + }) + return out + + +@router.post("/{source_id}/classification/approve") +async def approve_classification( + source_id: int, + request: Request, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Uebernimmt proposed_* in echte Felder, setzt classification_source='llm_approved'.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + before = {**src, "alignments": before_alignments} + + if src.get("proposed_political_orientation") is None: + raise HTTPException(status_code=400, detail="Keine LLM-Vorschlaege fuer diese Quelle vorhanden") + + try: + proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + + await db.execute( + """UPDATE sources SET + political_orientation = ?, + media_type = ?, + reliability = ?, + state_affiliated = ?, + country_code = ?, + classification_source = 'llm_approved', + classified_at = CURRENT_TIMESTAMP + WHERE id = ?""", + ( + src["proposed_political_orientation"], + src["proposed_media_type"], + src["proposed_reliability"], + 1 if src.get("proposed_state_affiliated") else 0, + src.get("proposed_country_code"), + source_id, + ), + ) + await _replace_alignments(db, source_id, [a for a in proposed_aligns if a in ALIGNMENT_VALUES]) + await _clear_proposed(db, source_id) + await db.commit() + + try: + await apply_reputation_overrides(db, source_id) + except Exception as e: + logger.warning("Reputation-Override fuer source_id=%s fehlgeschlagen: %s", source_id, e) + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + after_row = dict(await cursor.fetchone()) + after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + after = {**after_row, "alignments": after_alignments} + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=source_id, + before=before, after=after, + ) + return {"source_id": source_id, "status": "approved"} + + +@router.post("/{source_id}/classification/reject") +async def reject_classification( + source_id: int, + request: Request, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Verwirft die LLM-Vorschlaege ohne Uebernahme. classification_source: 'llm_pending' -> 'legacy'.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + before = dict(src) + + await _clear_proposed(db, source_id) + if src.get("classification_source") == "llm_pending": + await db.execute( + "UPDATE sources SET classification_source = 'legacy' WHERE id = ?", + (source_id,), + ) + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + after = dict(await cursor.fetchone()) + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=source_id, + before=before, after=after, + ) + return {"source_id": source_id, "status": "rejected"} + + +@router.post("/{source_id}/classification/reclassify") +async def reclassify_source( + source_id: int, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Triggert eine LLM-Klassifikation einer einzelnen Quelle (synchron, ~3-5s).""" + cursor = await db.execute("SELECT id FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + try: + result = await classify_source(db, source_id) + except Exception as e: + logger.error("Reclassify source_id=%s fehlgeschlagen: %s", source_id, e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Klassifikation fehlgeschlagen: {e}") + return result + + +async def _bulk_classify_background(limit: int, only_unclassified: bool): + """Hintergrund-Task: oeffnet eigene DB-Connection.""" + db = await get_db() + try: + await bulk_classify(db, limit=limit, only_unclassified=only_unclassified) + finally: + await db.close() + + +@router.post("/classification/bulk-classify") +async def trigger_bulk_classify( + background_tasks: BackgroundTasks, + limit: int = 50, + only_unclassified: bool = True, + admin: dict = Depends(get_current_admin), +): + """Startet eine Bulk-Klassifikation im Hintergrund.""" + if limit < 1 or limit > 500: + raise HTTPException(status_code=400, detail="limit muss zwischen 1 und 500 liegen") + background_tasks.add_task(_bulk_classify_background, limit, only_unclassified) + return {"status": "started", "limit": limit, "only_unclassified": only_unclassified} + + +@router.post("/external-reputation/sync") +async def trigger_external_reputation_sync( + background_tasks: BackgroundTasks, + admin: dict = Depends(get_current_admin), +): + """Startet Sync von IFCN- und EUvsDisinfo-Daten (Hintergrund).""" + async def _bg(): + db = await get_db() + try: + await sync_external_reputation(db) + finally: + await db.close() + + background_tasks.add_task(_bg) + return {"status": "started"} + + +@router.post("/classification/bulk-approve") +async def bulk_approve_classifications( + request: Request, + min_confidence: float = 0.85, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Genehmigt alle Pending-Vorschlaege ueber dem confidence-Schwellwert.""" + cursor = await db.execute( + """SELECT id, proposed_political_orientation, proposed_media_type, + proposed_reliability, proposed_state_affiliated, + proposed_country_code, proposed_alignments_json + FROM sources + WHERE proposed_political_orientation IS NOT NULL + AND COALESCE(proposed_confidence, 0) >= ?""", + (min_confidence,), + ) + rows = [dict(r) for r in await cursor.fetchall()] + approved_ids: list[int] = [] + for src in rows: + try: + proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + await db.execute( + """UPDATE sources SET + political_orientation = ?, + media_type = ?, + reliability = ?, + state_affiliated = ?, + country_code = ?, + classification_source = 'llm_approved', + classified_at = CURRENT_TIMESTAMP + WHERE id = ?""", + ( + src["proposed_political_orientation"], + src["proposed_media_type"], + src["proposed_reliability"], + 1 if src.get("proposed_state_affiliated") else 0, + src.get("proposed_country_code"), + src["id"], + ), + ) + await _replace_alignments( + db, src["id"], [a for a in proposed_aligns if a in ALIGNMENT_VALUES] + ) + await _clear_proposed(db, src["id"]) + approved_ids.append(src["id"]) + await db.commit() + + try: + for sid in approved_ids: + await apply_reputation_overrides(db, sid) + except Exception as e: + logger.warning("Bulk Reputation-Override fehlgeschlagen: %s", e) + + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=None, + after={"bulk_approved_ids": approved_ids, "min_confidence": min_confidence}, + ) + return {"approved": len(approved_ids), "ids": approved_ids} diff --git a/src/shared/services/external_reputation.py b/src/shared/services/external_reputation.py new file mode 100644 index 0000000..de973b3 --- /dev/null +++ b/src/shared/services/external_reputation.py @@ -0,0 +1,282 @@ +"""Externe Reputations-Daten fuer Quellen. + +Synchronisiert Domain-Listen von oeffentlichen Reputations-/Faktencheck-Datenbanken +und schreibt die Treffer in die sources-Spalten: + +- IFCN-Signatories (anerkannte Faktenchecker) -> ifcn_signatory +- EUvsDisinfo (pro-Kreml-Desinformation, Zenodo-CSV) -> eu_disinfo_listed, + eu_disinfo_case_count, eu_disinfo_last_seen + +Anschliessend wendet apply_reputation_overrides() Override-Regeln auf die +reliability-Spalte an: +- ifcn_signatory=1 -> reliability='sehr_hoch' +- eu_disinfo_case_count >= 5 -> reliability='sehr_niedrig' +- eu_disinfo_case_count >= 1 -> reliability eine Stufe runter (max bis 'niedrig') +""" +import csv +import io +import logging +from collections import defaultdict +from urllib.parse import urlparse + +import aiosqlite +import httpx + +logger = logging.getLogger("osint.external_reputation") + +IFCN_LIST_URL = "https://raw.githubusercontent.com/IFCN/verified-signatories/main/list" +EU_DISINFO_CSV_URL = "https://zenodo.org/records/10514307/files/euvsdisinfo_base.csv?download=1" + +HTTP_TIMEOUT = httpx.Timeout(60.0, connect=10.0) + +# Generische Plattform-Domains, die NICHT als Quelle markiert werden duerfen +# (EUvsDisinfo aggregiert anonyme Telegram-/Twitter-Posts unter Plattform-Domains). +PLATFORM_DOMAINS = { + "t.me", "telegram.me", "telegram.org", + "twitter.com", "x.com", "mobile.twitter.com", + "youtube.com", "youtu.be", "m.youtube.com", + "facebook.com", "fb.com", "m.facebook.com", + "instagram.com", "tiktok.com", "vk.com", "ok.ru", + "rumble.com", "bitchute.com", "odysee.com", + "reddit.com", "old.reddit.com", + "wordpress.com", "blogspot.com", "medium.com", + "substack.com", "wixsite.com", +} + +# Reliability-Skala in Stufenfolge (schlecht -> gut) +RELIABILITY_ORDER = ["sehr_niedrig", "niedrig", "gemischt", "hoch", "sehr_hoch"] + + +def _normalize_domain(raw: str | None) -> str | None: + """Normalisiert eine Domain: lowercase, ohne www., ohne Schema/Pfad.""" + if not raw: + return None + raw = raw.strip().lower() + if not raw: + return None + # Falls eine vollstaendige URL uebergeben wurde + if "://" in raw: + try: + raw = urlparse(raw).netloc or raw + except ValueError: + pass + # Pfad/Query strippen + raw = raw.split("/")[0].split("?")[0].split("#")[0] + if raw.startswith("www."): + raw = raw[4:] + return raw or None + + +async def _fetch_text(url: str) -> str: + """Laedt Text von einer URL. Wirft HTTPException bei Fehler.""" + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.text + + +async def sync_ifcn_signatories(db: aiosqlite.Connection) -> dict: + """Laedt IFCN-Domain-Liste und matcht gegen sources.domain. + + Setzt ifcn_signatory=1 wo die Domain in der Liste vorkommt, sonst 0. + """ + text = await _fetch_text(IFCN_LIST_URL) + domains: set[str] = set() + for line in text.splitlines(): + d = _normalize_domain(line) + if d: + domains.add(d) + logger.info("IFCN-Liste geladen: %d Domains", len(domains)) + + # Aktuelle Quellen mit Domain laden + cursor = await db.execute( + "SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + matched_ids: list[int] = [] + unmatched_ids: list[int] = [] + for s in sources: + nd = _normalize_domain(s["domain"]) + if nd and nd not in PLATFORM_DOMAINS and nd in domains: + matched_ids.append(s["id"]) + else: + unmatched_ids.append(s["id"]) + + # Bulk-Update in zwei Statements + if matched_ids: + placeholders = ",".join("?" for _ in matched_ids) + await db.execute( + f"UPDATE sources SET ifcn_signatory = 1 WHERE id IN ({placeholders})", + matched_ids, + ) + if unmatched_ids: + placeholders = ",".join("?" for _ in unmatched_ids) + await db.execute( + f"UPDATE sources SET ifcn_signatory = 0 WHERE id IN ({placeholders})", + unmatched_ids, + ) + await db.commit() + logger.info("IFCN-Sync: %d Quellen als Faktenchecker markiert (von %d)", + len(matched_ids), len(sources)) + return { + "list_size": len(domains), + "sources_checked": len(sources), + "matched": len(matched_ids), + } + + +async def sync_eu_disinfo(db: aiosqlite.Connection) -> dict: + """Laedt EUvsDisinfo-CSV von Zenodo, aggregiert pro Domain, schreibt sources. + + - eu_disinfo_listed: 1 wenn Domain mindestens 1x als 'disinformation' debunkt + - eu_disinfo_case_count: Anzahl Disinformation-Faelle + - eu_disinfo_last_seen: spaetestes debunk_date + """ + text = await _fetch_text(EU_DISINFO_CSV_URL) + reader = csv.DictReader(io.StringIO(text)) + + # Per-Domain aggregieren (nur class='disinformation') + counts: dict[str, int] = defaultdict(int) + last_seen: dict[str, str] = {} + total_rows = 0 + for row in reader: + total_rows += 1 + if (row.get("class") or "").strip().lower() != "disinformation": + continue + d = _normalize_domain(row.get("article_domain")) + if not d: + continue + counts[d] += 1 + debunk_date = (row.get("debunk_date") or "").strip() + if debunk_date: + prev = last_seen.get(d) + if not prev or debunk_date > prev: + last_seen[d] = debunk_date + logger.info("EUvsDisinfo-CSV: %d Zeilen, %d Domains mit Desinformation", + total_rows, len(counts)) + + # Quellen laden + matchen + cursor = await db.execute( + "SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + matched = 0 + for s in sources: + nd = _normalize_domain(s["domain"]) + if nd and nd not in PLATFORM_DOMAINS and nd in counts: + await db.execute( + """UPDATE sources SET + eu_disinfo_listed = 1, + eu_disinfo_case_count = ?, + eu_disinfo_last_seen = ? + WHERE id = ?""", + (counts[nd], last_seen.get(nd), s["id"]), + ) + matched += 1 + else: + await db.execute( + """UPDATE sources SET + eu_disinfo_listed = 0, + eu_disinfo_case_count = 0, + eu_disinfo_last_seen = NULL + WHERE id = ?""", + (s["id"],), + ) + await db.commit() + logger.info("EUvsDisinfo-Sync: %d Quellen als Desinformations-Quelle markiert (von %d)", + matched, len(sources)) + return { + "rows_in_csv": total_rows, + "domains_with_disinfo_in_csv": len(counts), + "sources_checked": len(sources), + "matched": matched, + } + + +def _override_reliability(current: str | None, ifcn: bool, eu_count: int) -> str | None: + """Wendet Override-Regeln auf eine reliability-Stufe an. + + Rueckgabe: neue Stufe (oder None, wenn unveraendert). + """ + cur = current or "na" + + # IFCN gewinnt: zertifizierter Faktenchecker -> sehr_hoch (immer) + if ifcn: + return "sehr_hoch" if cur != "sehr_hoch" else None + + # EUvsDisinfo: Downgrade + if eu_count >= 5: + return "sehr_niedrig" if cur != "sehr_niedrig" else None + if eu_count >= 1: + # Eine Stufe runter, mindestens bis 'niedrig' + if cur == "na": + return "niedrig" + if cur in RELIABILITY_ORDER: + idx = RELIABILITY_ORDER.index(cur) + new_idx = max(0, idx - 1) + new = RELIABILITY_ORDER[new_idx] + # Mindeststufe 'niedrig' bei eu_count >= 1 + if RELIABILITY_ORDER.index(new) > RELIABILITY_ORDER.index("niedrig"): + new = "niedrig" + return new if new != cur else None + return None + + +async def apply_reputation_overrides(db: aiosqlite.Connection, source_id: int | None = None) -> dict: + """Wendet Reliability-Override-Regeln an. + + Wenn source_id angegeben ist, nur fuer diese Quelle. Sonst fuer alle Quellen. + """ + if source_id is not None: + cursor = await db.execute( + "SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count " + "FROM sources WHERE id = ?", + (source_id,), + ) + else: + cursor = await db.execute( + "SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count FROM sources" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + changed = 0 + for s in sources: + new = _override_reliability( + s.get("reliability"), + bool(s.get("ifcn_signatory")), + int(s.get("eu_disinfo_case_count") or 0), + ) + if new is not None: + await db.execute( + "UPDATE sources SET reliability = ? WHERE id = ?", + (new, s["id"]), + ) + changed += 1 + await db.commit() + logger.info("Reliability-Override: %d Quellen angepasst (von %d gepruefte)", + changed, len(sources)) + return {"checked": len(sources), "changed": changed} + + +async def sync_all(db: aiosqlite.Connection) -> dict: + """Vollstaendiger Sync: IFCN + EUvsDisinfo + Reliability-Override. + + Setzt external_data_synced_at fuer alle Quellen. + """ + ifcn_result = await sync_ifcn_signatories(db) + eu_result = await sync_eu_disinfo(db) + override_result = await apply_reputation_overrides(db) + + await db.execute( + "UPDATE sources SET external_data_synced_at = CURRENT_TIMESTAMP " + "WHERE domain IS NOT NULL AND domain != ''" + ) + await db.commit() + + return { + "ifcn": ifcn_result, + "eu_disinfo": eu_result, + "override": override_result, + } diff --git a/src/shared/services/source_classifier.py b/src/shared/services/source_classifier.py new file mode 100644 index 0000000..15d6cd2 --- /dev/null +++ b/src/shared/services/source_classifier.py @@ -0,0 +1,295 @@ +"""Klassifiziert Quellen via Claude (Haiku) nach 4 Achsen + state_affiliated + country. + +Schreibt Vorschlaege in die proposed_*-Spalten von sources und setzt +classification_source='llm_pending'. Approval erfolgt ueber separate Endpoints, +die proposed_* in die echten Spalten kopieren. +""" +import asyncio +import json +import logging +import re + +import aiosqlite + +from shared.agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_classifier") + +POLITICAL_VALUES = { + "links_extrem", "links", "mitte_links", "liberal", "mitte", + "konservativ", "mitte_rechts", "rechts", "rechts_extrem", "na", +} +MEDIA_TYPE_VALUES = { + "tageszeitung", "wochenzeitung", "magazin", "tv_sender", "radio", + "oeffentlich_rechtlich", "nachrichtenagentur", "online_only", "blog", + "telegram_kanal", "telegram_bot", "podcast", "social_media", "imageboard", + "think_tank", "ngo", "behoerde", "staatsmedium", "fachmedium", "sonstige", +} +RELIABILITY_VALUES = {"sehr_hoch", "hoch", "gemischt", "niedrig", "sehr_niedrig", "na"} +ALIGNMENT_VALUES = { + "prorussisch", "proiranisch", "prowestlich", "proukrainisch", + "prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch", + "protuerkisch", "panarabisch", "neutral", "sonstige", +} + + +def _build_prompt(src: dict, sample_articles: list[dict]) -> str: + sample_text = "" + if sample_articles: + lines = [] + for i, art in enumerate(sample_articles[:5], 1): + headline = (art.get("headline") or art.get("headline_de") or "").strip() + if headline: + lines.append(f"{i}. {headline[:200]}") + if lines: + sample_text = "\nLetzte Artikel/Headlines:\n" + "\n".join(lines) + + return f"""Du bist ein OSINT-Analyst und klassifizierst Nachrichten- und Medienquellen fuer ein Lagebild-Monitoring-System (DACH-Raum). + +QUELLE: +Name: {src.get('name')} +URL: {src.get('url') or '-'} +Domain: {src.get('domain') or '-'} +Quellentyp: {src.get('source_type')} +Bisherige Kategorie: {src.get('category')} +Sprache: {src.get('language') or 'unbekannt'} +Bisherige Notiz (Freitext): {src.get('bias') or '-'}{sample_text} + +AUFGABE: Klassifiziere die Quelle nach folgenden Achsen. + +1. political_orientation: + - links_extrem (z.B. linksunten.indymedia) + - links (klar links, z.B. junge Welt, taz) + - mitte_links (linksliberal/sozialdemokratisch, z.B. SZ, Spiegel) + - liberal (wirtschafts-/grünliberal, z.B. NZZ, Zeit) + - mitte (politisch neutral, Agentur, z.B. dpa, Reuters, tagesschau) + - konservativ (buergerlich-konservativ, z.B. FAZ, Welt) + - mitte_rechts (rechts-buergerlich, z.B. Tichys Einblick, Achgut) + - rechts (klar rechts, z.B. Junge Freiheit, EpochTimes) + - rechts_extrem (z.B. Compact, PI-News) + - na (nicht klassifizierbar: Behoerde, Fachmedium, Think Tank ohne klare politische Linie) + +2. media_type (genau einer): + tageszeitung, wochenzeitung, magazin, tv_sender, radio, oeffentlich_rechtlich, + nachrichtenagentur, online_only, blog, telegram_kanal, telegram_bot, podcast, + social_media, imageboard, think_tank, ngo, behoerde, staatsmedium, fachmedium, sonstige + +3. reliability: + - sehr_hoch (etablierte Qualitaet, Faktencheck: tagesschau, dpa, FAZ, Reuters) + - hoch (serioes mit gelegentlichen Schwaechen: taz, Welt, BILD bei harten News) + - gemischt (Mix Meinung/Einseitigkeit: Tichys Einblick, Achgut, Boulevard) + - niedrig (haeufig irrefuehrend, schwache Quellenarbeit: Junge Freiheit, EpochTimes) + - sehr_niedrig (bekannt fuer Desinformation/Verschwoerung: Compact, RT, Sputnik, PI-News) + - na (nicht bewertbar) + +4. alignments (Mehrfach, leeres Array wenn keine ausgepraegte Naehe): + prorussisch, proiranisch, prowestlich, proukrainisch, prochinesisch, projapanisch, + proisraelisch, propalaestinensisch, protuerkisch, panarabisch, neutral, sonstige + +5. state_affiliated (true/false): true wenn vom Staat finanziert/kontrolliert + (RT, Sputnik, CGTN, PressTV, Xinhua, TRT). Public Service Broadcaster + wie ARD/ZDF/BBC sind NICHT state_affiliated. + +6. country_code (ISO 3166-1 alpha-2): Heimatland (DE, AT, CH, RU, US, ...). null wenn unklar. + +7. confidence (0.0-1.0): 0.85+ fuer bekannte Outlets, 0.5-0.85 fuer mittelbekannt, <0.5 fuer unsicher. + +8. reasoning (1-2 Saetze): Kurze Begruendung der Hauptklassifikationen. + +WICHTIG: +- Antworte AUSSCHLIESSLICH mit einem JSON-Objekt, kein Text drumherum. +- Nutze ausschliesslich die genannten enum-Werte (snake_case). +- Bei Unklarheit lieber `na` und niedrige confidence. + +JSON-Schema: +{{ + "political_orientation": "...", + "media_type": "...", + "reliability": "...", + "alignments": ["..."], + "state_affiliated": false, + "country_code": "DE", + "confidence": 0.9, + "reasoning": "..." +}}""" + + +async def _load_sample_articles(db: aiosqlite.Connection, name: str, domain: str | None, limit: int = 5) -> list[dict]: + """Laedt die letzten Headlines einer Quelle (per name oder Domain-Match).""" + rows: list = [] + if name: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source = ? ORDER BY collected_at DESC LIMIT ?", + (name, limit), + ) + rows = await cursor.fetchall() + if not rows and domain: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source_url LIKE ? ORDER BY collected_at DESC LIMIT ?", + (f"%{domain}%", limit), + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +def _validate(parsed: dict) -> dict: + """Validiert + normalisiert eine LLM-Antwort gegen die Enums.""" + pol = parsed.get("political_orientation", "na") + if pol not in POLITICAL_VALUES: + pol = "na" + mt = parsed.get("media_type", "sonstige") + if mt not in MEDIA_TYPE_VALUES: + mt = "sonstige" + rel = parsed.get("reliability", "na") + if rel not in RELIABILITY_VALUES: + rel = "na" + aligns_raw = parsed.get("alignments") or [] + if not isinstance(aligns_raw, list): + aligns_raw = [] + aligns = sorted({a for a in aligns_raw if isinstance(a, str) and a in ALIGNMENT_VALUES}) + sa = bool(parsed.get("state_affiliated", False)) + cc = parsed.get("country_code") + if isinstance(cc, str) and len(cc) == 2 and cc.isalpha(): + cc = cc.upper() + else: + cc = None + try: + confidence = float(parsed.get("confidence", 0.5)) + confidence = max(0.0, min(1.0, confidence)) + except (TypeError, ValueError): + confidence = 0.5 + reasoning = str(parsed.get("reasoning", ""))[:1000] + return { + "political_orientation": pol, + "media_type": mt, + "reliability": rel, + "alignments": aligns, + "state_affiliated": sa, + "country_code": cc, + "confidence": confidence, + "reasoning": reasoning, + } + + +async def classify_source( + db: aiosqlite.Connection, + source_id: int, + sample_limit: int = 5, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert eine einzelne Quelle und schreibt die Vorschlaege in proposed_*-Spalten.""" + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, language, bias, " + "classification_source FROM sources WHERE id = ?", + (source_id,), + ) + row = await cursor.fetchone() + if not row: + raise ValueError(f"Quelle {source_id} nicht gefunden") + src = dict(row) + + sample = await _load_sample_articles(db, src["name"], src.get("domain"), sample_limit) + prompt = _build_prompt(src, sample) + response, usage = await call_claude(prompt, tools=None, model=model) + + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if not json_match: + raise ValueError(f"Keine JSON-Antwort von Claude fuer source_id={source_id}: {response[:200]}") + parsed = json.loads(json_match.group(0)) + result = _validate(parsed) + + # Nur classification_source auf 'llm_pending' setzen, wenn nicht bereits manuell/approved + new_src = "CASE WHEN classification_source IN ('manual','llm_approved') THEN classification_source ELSE 'llm_pending' END" + await db.execute( + f"""UPDATE sources SET + proposed_political_orientation = ?, + proposed_media_type = ?, + proposed_reliability = ?, + proposed_state_affiliated = ?, + proposed_country_code = ?, + proposed_alignments_json = ?, + proposed_confidence = ?, + proposed_reasoning = ?, + proposed_at = CURRENT_TIMESTAMP, + classification_source = {new_src} + WHERE id = ?""", + ( + result["political_orientation"], + result["media_type"], + result["reliability"], + 1 if result["state_affiliated"] else 0, + result["country_code"], + json.dumps(result["alignments"], ensure_ascii=False), + result["confidence"], + result["reasoning"], + source_id, + ), + ) + await db.commit() + + logger.info( + "Klassifiziert source_id=%s '%s' -> %s/%s/%s conf=%.2f ($%.4f)", + source_id, src["name"], result["political_orientation"], + result["media_type"], result["reliability"], result["confidence"], + usage.cost_usd, + ) + + result["source_id"] = source_id + result["usage"] = { + "cost_usd": usage.cost_usd, + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } + return result + + +async def bulk_classify( + db: aiosqlite.Connection, + limit: int = 50, + only_unclassified: bool = True, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert noch unklassifizierte Quellen (sequenziell). + + Args: + limit: Maximale Anzahl Quellen pro Aufruf + only_unclassified: Wenn True, nur classification_source='legacy'. + Wenn False, auch 'llm_pending' neu klassifizieren. + """ + if only_unclassified: + where = "classification_source = 'legacy'" + else: + where = "classification_source IN ('legacy', 'llm_pending')" + cursor = await db.execute( + f"SELECT id FROM sources WHERE {where} AND status = 'active' " + f"AND source_type != 'excluded' ORDER BY id LIMIT ?", + (limit,), + ) + ids = [row["id"] for row in await cursor.fetchall()] + + total_cost = 0.0 + success = 0 + errors: list[dict] = [] + + for sid in ids: + try: + r = await classify_source(db, sid, model=model) + total_cost += r["usage"]["cost_usd"] + success += 1 + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("Klassifikation source_id=%s fehlgeschlagen: %s", sid, e, exc_info=True) + errors.append({"source_id": sid, "error": str(e)}) + + logger.info( + "Bulk-Klassifikation fertig: %d/%d erfolgreich, $%.4f Kosten, %d Fehler", + success, len(ids), total_cost, len(errors), + ) + return { + "processed": len(ids), + "success": success, + "errors": errors, + "total_cost_usd": total_cost, + } diff --git a/src/static/css/style.css b/src/static/css/style.css index fc102ba..ecf1ca6 100644 --- a/src/static/css/style.css +++ b/src/static/css/style.css @@ -962,3 +962,162 @@ input[type="date"].filter-select { padding: 6px 10px; } font-weight: 400; } +/* === Klassifikations-Review === */ +.sources-tab-badge { + display: inline-flex; + align-items: center; + justify-content: center; + min-width: 20px; + padding: 0 6px; + height: 18px; + border-radius: 9px; + background: var(--accent); + color: var(--bg-primary); + font-size: 10px; + font-weight: 700; +} + +.review-toolbar { + display: flex; + align-items: center; + justify-content: space-between; + padding: 10px 14px; + background: var(--bg-secondary); + border: 1px solid var(--border); + border-radius: var(--radius); + margin-bottom: 12px; + flex-wrap: wrap; + gap: 12px; +} +.review-toolbar-info { + display: flex; + align-items: center; + gap: 16px; + font-size: 13px; + color: var(--text-primary); +} +.review-conf-filter { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 12px; + color: var(--text-secondary); +} +.review-toolbar-actions { display: flex; gap: 6px; } + +.review-list { display: flex; flex-direction: column; gap: 8px; } +.review-card { + background: var(--bg-secondary); + border: 1px solid var(--border); + border-radius: var(--radius); + padding: 12px 14px; +} +.review-card-header { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 12px; + margin-bottom: 10px; +} +.review-card-title { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 8px; +} +.review-card-name { font-weight: 600; font-size: 14px; color: var(--text-primary); } +.review-card-domain { font-size: 11px; color: var(--text-muted); } +.review-global-badge { + display: inline-flex; + align-items: center; + padding: 1px 6px; + border-radius: var(--radius); + background: #5e35b1; + color: #fff; + font-size: 9px; + font-weight: 600; + letter-spacing: 0.3px; + text-transform: uppercase; +} +.review-card-confidence { + display: inline-flex; + flex-direction: column; + align-items: center; + padding: 4px 10px; + border-radius: var(--radius); + min-width: 60px; +} +.review-card-confidence .conf-value { font-size: 14px; font-weight: 700; } +.review-card-confidence .conf-label { font-size: 9px; text-transform: uppercase; letter-spacing: 0.3px; opacity: 0.8; } +.review-card-confidence.conf-high { background: rgba(34,197,94,0.15); color: var(--success); } +.review-card-confidence.conf-medium { background: rgba(245,158,11,0.15); color: var(--warning); } +.review-card-confidence.conf-low { background: rgba(239,68,68,0.15); color: var(--danger); } + +.review-card-diff { + display: grid; + grid-template-columns: 1fr; + gap: 4px; + font-size: 12px; + margin-bottom: 10px; +} +.review-diff-row { + display: grid; + grid-template-columns: 130px 1fr 24px 1fr; + align-items: center; + gap: 8px; + padding: 3px 6px; + border-radius: 3px; +} +.review-diff-row.changed { background: rgba(245,158,11,0.10); } +.review-diff-label { color: var(--text-secondary); font-weight: 500; } +.review-diff-current { color: var(--text-muted); } +.review-diff-arrow { text-align: center; color: var(--text-muted); font-weight: 600; } +.review-diff-proposed { color: var(--text-primary); font-weight: 500; } +.review-diff-row.changed .review-diff-proposed { color: var(--warning); font-weight: 600; } + +.review-card-reasoning { + font-size: 12px; + color: var(--text-secondary); + background: var(--bg-tertiary); + padding: 8px 10px; + border-radius: var(--radius); + margin-bottom: 10px; + line-height: 1.5; +} +.review-card-actions { display: flex; gap: 6px; flex-wrap: wrap; } + +/* Edit-Form: Klassifikations-Sektion */ +.sources-classification-section { + margin-top: 14px; + padding-top: 14px; + border-top: 1px solid var(--border); +} +.sources-classification-header { + font-size: 12px; + font-weight: 600; + color: var(--text-secondary); + margin-bottom: 10px; + letter-spacing: 0.3px; + text-transform: uppercase; +} +.alignment-chips { display: flex; flex-wrap: wrap; gap: 6px; } +.alignment-chip { + display: inline-flex; + align-items: center; + padding: 4px 10px; + border-radius: 999px; + font-size: 11px; + font-weight: 500; + background: transparent; + color: var(--text-secondary); + border: 1px solid var(--border); + cursor: pointer; + transition: all 0.12s ease; +} +.alignment-chip:hover { background: var(--bg-tertiary); color: var(--text-primary); } +.alignment-chip.active { + background: var(--accent); + color: var(--bg-primary); + border-color: var(--accent); +} + diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 4bd8138..34d5390 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -294,6 +294,7 @@ + @@ -406,6 +407,35 @@ + +
+
+
+ 0 Vorschläge ausstehend + +
+
+ + + +
+
+
+
+
Lade Review-Queue…
+
+
+
+ @@ -653,6 +683,96 @@ + +
+
Einordnung (Klassifikation)
+
+
+ + +
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ +
+
+
+ +
+ + + + + + + + + + + + +
+
+
+