diff --git a/requirements.txt b/requirements.txt index 9903104..10fa183 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ python-multipart aiosmtplib httpx>=0.28 feedparser>=6.0 +# PDF-Upload-Validierung +pypdf>=5.0 diff --git a/src/models.py b/src/models.py index 17c3345..573afa2 100644 --- a/src/models.py +++ b/src/models.py @@ -25,11 +25,13 @@ class TokenResponse(BaseModel): class OrgCreate(BaseModel): name: str = Field(min_length=1, max_length=200) slug: str = Field(min_length=1, max_length=100, pattern="^[a-z0-9-]+$") + output_language: str = Field(default="de", pattern="^(de|en)$") class OrgUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=200) is_active: Optional[bool] = None + output_language: Optional[str] = Field(default=None, pattern="^(de|en)$") class OrgResponse(BaseModel): @@ -43,6 +45,7 @@ class OrgResponse(BaseModel): created_at: str globe_access: bool = False network_access: bool = False + output_language: str = "de" class LicenseCreate(BaseModel): diff --git a/src/routers/organizations.py b/src/routers/organizations.py index 58a2b56..a936665 100644 --- a/src/routers/organizations.py +++ b/src/routers/organizations.py @@ -25,6 +25,15 @@ async def _enrich_org(db: aiosqlite.Connection, row: aiosqlite.Row) -> dict: lic = await cursor.fetchone() org["license_status"] = lic["status"] if lic else "none" org["license_type"] = lic["license_type"] if lic else "" + + # output_language aus organization_settings (Default 'de') + cursor = await db.execute( + "SELECT value FROM organization_settings WHERE organization_id = ? AND key = 'output_language'", + (org["id"],), + ) + lang_row = await cursor.fetchone() + org["output_language"] = lang_row["value"] if lang_row else "de" + return org @@ -57,6 +66,10 @@ async def create_organization( org_id = cursor.lastrowid await db.commit() + # output_language als organization_settings-Eintrag persistieren + from shared.services.org_settings import set_org_setting + await set_org_setting(db, org_id, "output_language", data.output_language) + cursor = await db.execute("SELECT * FROM organizations WHERE id = ?", (org_id,)) new_row_obj = await cursor.fetchone() await log_action( @@ -105,6 +118,11 @@ async def update_organization( await db.execute(f"UPDATE organizations SET {set_clause} WHERE id = ?", values) await db.commit() + # output_language separat ueber organization_settings setzen + if data.output_language is not None: + from shared.services.org_settings import set_org_setting + await set_org_setting(db, org_id, "output_language", data.output_language) + after = await row_to_dict(db, "organizations", org_id) await log_action( db, admin, get_client_ip(request), diff --git a/src/routers/sources.py b/src/routers/sources.py index 01c447e..458893b 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,30 +1,106 @@ """Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" +import json import logging +import hashlib +import os +import re import uuid -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, Request, UploadFile, status from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import Optional import aiosqlite from auth import get_current_admin -from database import db_dependency +from database import db_dependency, get_db from audit import log_action, get_client_ip from source_meta import get_meta -from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S +from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S, DB_PATH from shared.source_rules import ( discover_source, discover_all_feeds, evaluate_feeds_with_claude, domain_to_display_name, ) +from shared.services.source_classifier import ( + bulk_classify, + classify_source, + ALIGNMENT_VALUES, + POLITICAL_VALUES, + MEDIA_TYPE_VALUES, + RELIABILITY_VALUES, +) +from shared.services.external_reputation import ( + apply_reputation_overrides, + sync_all as sync_external_reputation, +) logger = logging.getLogger("verwaltung.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) -SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", "fetch_strategy"} +SOURCE_UPDATE_COLUMNS = { + "name", "url", "domain", "source_type", "category", "status", "notes", + "language", "bias", "fetch_strategy", + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} +SOURCE_CLASSIFICATION_FIELDS = { + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} + + +async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]: + if not source_ids: + return {} + placeholders = ",".join("?" for _ in source_ids) + cursor = await db.execute( + f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment", + source_ids, + ) + out: dict[int, list[str]] = {sid: [] for sid in source_ids} + for row in await cursor.fetchall(): + out.setdefault(row["source_id"], []).append(row["alignment"]) + return out + + +async def _replace_alignments(db: aiosqlite.Connection, source_id: int, alignments: list[str]): + """Ersetzt die alignments-Liste einer Quelle (DELETE + INSERT) — Aufrufer muss commit() machen.""" + await db.execute("DELETE FROM source_alignments WHERE source_id = ?", (source_id,)) + seen: set[str] = set() + for raw in alignments: + a = (raw or "").strip().lower() + if not a or a in seen: + continue + if a not in ALIGNMENT_VALUES: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Ungueltiger alignment-Wert: '{a}'", + ) + seen.add(a) + await db.execute( + "INSERT INTO source_alignments (source_id, alignment) VALUES (?, ?)", + (source_id, a), + ) + + +async def _clear_proposed(db: aiosqlite.Connection, source_id: int): + await db.execute( + """UPDATE sources SET + proposed_political_orientation = NULL, + proposed_media_type = NULL, + proposed_reliability = NULL, + proposed_state_affiliated = NULL, + proposed_country_code = NULL, + proposed_alignments_json = NULL, + proposed_confidence = NULL, + proposed_reasoning = NULL, + proposed_at = NULL + WHERE id = ?""", + (source_id,), + ) @router.get("/meta") @@ -42,7 +118,7 @@ class GlobalSourceCreate(BaseModel): name: str = Field(min_length=1, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") + source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed|pdf_document)$") category: str = Field(default="sonstige") status: str = Field(default="active", pattern="^(active|inactive)$") notes: Optional[str] = None @@ -55,12 +131,18 @@ class GlobalSourceUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") + source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed|pdf_document)$") category: Optional[str] = None status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") notes: Optional[str] = None language: Optional[str] = Field(default=None, max_length=100) bias: Optional[str] = Field(default=None, max_length=500) + political_orientation: Optional[str] = None + media_type: Optional[str] = None + reliability: Optional[str] = None + state_affiliated: Optional[bool] = None + country_code: Optional[str] = Field(default=None, max_length=8) + alignments: Optional[list[str]] = None @router.get("/global") @@ -120,7 +202,11 @@ async def list_global_sources( WHERE s.tenant_id IS NULL ORDER BY s.category, s.source_type, s.name """) - return [dict(row) for row in await cursor.fetchall()] + rows = [dict(row) for row in await cursor.fetchall()] + alignments_map = await _load_alignments_for(db, [r["id"] for r in rows]) + for r in rows: + r["alignments"] = alignments_map.get(r["id"], []) + return rows @router.post("/global", status_code=201) @@ -170,7 +256,7 @@ async def update_global_source( admin: dict = Depends(get_current_admin), db: aiosqlite.Connection = Depends(db_dependency), ): - """Grundquelle bearbeiten.""" + """Grundquelle bearbeiten — inkl. Klassifikation + alignments.""" cursor = await db.execute( "SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,) ) @@ -178,21 +264,50 @@ async def update_global_source( if not row: raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden") before = dict(row) + before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) - updates = {} - for field, value in data.model_dump(exclude_none=True).items(): - updates[field] = value + payload = data.model_dump(exclude_none=True) + alignments = payload.pop("alignments", None) - if not updates: - return before + if "political_orientation" in payload and payload["political_orientation"] not in POLITICAL_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltige political_orientation: {payload['political_orientation']}") + if "media_type" in payload and payload["media_type"] not in MEDIA_TYPE_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltiger media_type: {payload['media_type']}") + if "reliability" in payload and payload["reliability"] not in RELIABILITY_VALUES: + raise HTTPException(status_code=422, detail=f"Ungueltige reliability: {payload['reliability']}") + + updates = {k: v for k, v in payload.items() if k in SOURCE_UPDATE_COLUMNS} + if "state_affiliated" in updates: + updates["state_affiliated"] = 1 if updates["state_affiliated"] else 0 + + classification_touched = any(k in updates for k in SOURCE_CLASSIFICATION_FIELDS) or alignments is not None + if classification_touched: + updates["classification_source"] = "manual" + updates["classified_at"] = None # CURRENT_TIMESTAMP via SQL — siehe unten + + if updates: + sets = [] + vals = [] + for k, v in updates.items(): + if k == "classified_at": + sets.append("classified_at = CURRENT_TIMESTAMP") + else: + sets.append(f"{k} = ?") + vals.append(v) + vals.append(source_id) + await db.execute(f"UPDATE sources SET {', '.join(sets)} WHERE id = ?", vals) + + if alignments is not None: + await _replace_alignments(db, source_id, alignments) - set_clause = ", ".join(f"{k} = ?" for k in updates) - values = list(updates.values()) + [source_id] - await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) after = dict(await cursor.fetchone()) + after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + if before_alignments != after_alignments: + before["alignments"] = before_alignments + after["alignments"] = after_alignments await log_action( db, admin, get_client_ip(request), action="update", resource_type="source", resource_id=source_id, @@ -1086,3 +1201,420 @@ Nur das JSON, kein anderer Text.""" except Exception as e: raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}") + + +# === Klassifikations-Review (LLM-Vorschlaege approve/reject/reclassify) === + + +@router.get("/classification/stats") +async def classification_stats( + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Counts pro classification_source-Wert + Anzahl Pending-Reviews (alle Quellen).""" + cursor = await db.execute( + """SELECT classification_source, COUNT(*) as cnt + FROM sources + WHERE status = 'active' + GROUP BY classification_source""" + ) + by_source = {row["classification_source"] or "legacy": row["cnt"] for row in await cursor.fetchall()} + cursor = await db.execute( + """SELECT COUNT(*) as cnt FROM sources + WHERE status = 'active' AND proposed_political_orientation IS NOT NULL""" + ) + pending = (await cursor.fetchone())["cnt"] + return { + "by_classification_source": by_source, + "pending_review": pending, + "total": sum(by_source.values()), + } + + +@router.get("/classification/queue") +async def classification_queue( + limit: int = 50, + min_confidence: float = 0.0, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Liefert Quellen mit nicht-leeren proposed_*-Spalten (Review-Queue).""" + cursor = await db.execute( + """SELECT s.* FROM sources s + WHERE s.proposed_political_orientation IS NOT NULL + AND COALESCE(s.proposed_confidence, 0) >= ? + ORDER BY s.proposed_confidence DESC, s.proposed_at DESC + LIMIT ?""", + (min_confidence, limit), + ) + rows = [dict(r) for r in await cursor.fetchall()] + alignments_map = await _load_alignments_for(db, [r["id"] for r in rows]) + out = [] + for d in rows: + try: + proposed_aligns = json.loads(d.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + out.append({ + "id": d["id"], + "name": d["name"], + "url": d.get("url"), + "domain": d.get("domain"), + "source_type": d.get("source_type"), + "category": d.get("category"), + "is_global": d.get("tenant_id") is None, + "current": { + "political_orientation": d.get("political_orientation"), + "media_type": d.get("media_type"), + "reliability": d.get("reliability"), + "state_affiliated": bool(d.get("state_affiliated")), + "country_code": d.get("country_code"), + "alignments": alignments_map.get(d["id"], []), + "classification_source": d.get("classification_source"), + }, + "proposed": { + "political_orientation": d.get("proposed_political_orientation"), + "media_type": d.get("proposed_media_type"), + "reliability": d.get("proposed_reliability"), + "state_affiliated": bool(d.get("proposed_state_affiliated")), + "country_code": d.get("proposed_country_code"), + "alignments": proposed_aligns, + "confidence": d.get("proposed_confidence"), + "reasoning": d.get("proposed_reasoning"), + "proposed_at": d.get("proposed_at"), + }, + }) + return out + + +@router.post("/{source_id}/classification/approve") +async def approve_classification( + source_id: int, + request: Request, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Uebernimmt proposed_* in echte Felder, setzt classification_source='llm_approved'.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + before = {**src, "alignments": before_alignments} + + if src.get("proposed_political_orientation") is None: + raise HTTPException(status_code=400, detail="Keine LLM-Vorschlaege fuer diese Quelle vorhanden") + + try: + proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + + await db.execute( + """UPDATE sources SET + political_orientation = ?, + media_type = ?, + reliability = ?, + state_affiliated = ?, + country_code = ?, + classification_source = 'llm_approved', + classified_at = CURRENT_TIMESTAMP + WHERE id = ?""", + ( + src["proposed_political_orientation"], + src["proposed_media_type"], + src["proposed_reliability"], + 1 if src.get("proposed_state_affiliated") else 0, + src.get("proposed_country_code"), + source_id, + ), + ) + await _replace_alignments(db, source_id, [a for a in proposed_aligns if a in ALIGNMENT_VALUES]) + await _clear_proposed(db, source_id) + await db.commit() + + try: + await apply_reputation_overrides(db, source_id) + except Exception as e: + logger.warning("Reputation-Override fuer source_id=%s fehlgeschlagen: %s", source_id, e) + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + after_row = dict(await cursor.fetchone()) + after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, [])) + after = {**after_row, "alignments": after_alignments} + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=source_id, + before=before, after=after, + ) + return {"source_id": source_id, "status": "approved"} + + +@router.post("/{source_id}/classification/reject") +async def reject_classification( + source_id: int, + request: Request, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Verwirft die LLM-Vorschlaege ohne Uebernahme. classification_source: 'llm_pending' -> 'legacy'.""" + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + src = dict(row) + before = dict(src) + + await _clear_proposed(db, source_id) + if src.get("classification_source") == "llm_pending": + await db.execute( + "UPDATE sources SET classification_source = 'legacy' WHERE id = ?", + (source_id,), + ) + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) + after = dict(await cursor.fetchone()) + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=source_id, + before=before, after=after, + ) + return {"source_id": source_id, "status": "rejected"} + + +@router.post("/{source_id}/classification/reclassify") +async def reclassify_source( + source_id: int, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Triggert eine LLM-Klassifikation einer einzelnen Quelle (synchron, ~3-5s).""" + cursor = await db.execute("SELECT id FROM sources WHERE id = ?", (source_id,)) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Quelle nicht gefunden") + try: + result = await classify_source(db, source_id) + except Exception as e: + logger.error("Reclassify source_id=%s fehlgeschlagen: %s", source_id, e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Klassifikation fehlgeschlagen: {e}") + return result + + +async def _bulk_classify_background(limit: int, only_unclassified: bool): + """Hintergrund-Task: oeffnet eigene DB-Connection.""" + db = await get_db() + try: + await bulk_classify(db, limit=limit, only_unclassified=only_unclassified) + finally: + await db.close() + + +@router.post("/classification/bulk-classify") +async def trigger_bulk_classify( + background_tasks: BackgroundTasks, + limit: int = 50, + only_unclassified: bool = True, + admin: dict = Depends(get_current_admin), +): + """Startet eine Bulk-Klassifikation im Hintergrund.""" + if limit < 1 or limit > 500: + raise HTTPException(status_code=400, detail="limit muss zwischen 1 und 500 liegen") + background_tasks.add_task(_bulk_classify_background, limit, only_unclassified) + return {"status": "started", "limit": limit, "only_unclassified": only_unclassified} + + +@router.post("/external-reputation/sync") +async def trigger_external_reputation_sync( + background_tasks: BackgroundTasks, + admin: dict = Depends(get_current_admin), +): + """Startet Sync von IFCN- und EUvsDisinfo-Daten (Hintergrund).""" + async def _bg(): + db = await get_db() + try: + await sync_external_reputation(db) + finally: + await db.close() + + background_tasks.add_task(_bg) + return {"status": "started"} + + +@router.post("/classification/bulk-approve") +async def bulk_approve_classifications( + request: Request, + min_confidence: float = 0.85, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Genehmigt alle Pending-Vorschlaege ueber dem confidence-Schwellwert.""" + cursor = await db.execute( + """SELECT id, proposed_political_orientation, proposed_media_type, + proposed_reliability, proposed_state_affiliated, + proposed_country_code, proposed_alignments_json + FROM sources + WHERE proposed_political_orientation IS NOT NULL + AND COALESCE(proposed_confidence, 0) >= ?""", + (min_confidence,), + ) + rows = [dict(r) for r in await cursor.fetchall()] + approved_ids: list[int] = [] + for src in rows: + try: + proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]") + except (json.JSONDecodeError, TypeError): + proposed_aligns = [] + await db.execute( + """UPDATE sources SET + political_orientation = ?, + media_type = ?, + reliability = ?, + state_affiliated = ?, + country_code = ?, + classification_source = 'llm_approved', + classified_at = CURRENT_TIMESTAMP + WHERE id = ?""", + ( + src["proposed_political_orientation"], + src["proposed_media_type"], + src["proposed_reliability"], + 1 if src.get("proposed_state_affiliated") else 0, + src.get("proposed_country_code"), + src["id"], + ), + ) + await _replace_alignments( + db, src["id"], [a for a in proposed_aligns if a in ALIGNMENT_VALUES] + ) + await _clear_proposed(db, src["id"]) + approved_ids.append(src["id"]) + await db.commit() + + try: + for sid in approved_ids: + await apply_reputation_overrides(db, sid) + except Exception as e: + logger.warning("Bulk Reputation-Override fehlgeschlagen: %s", e) + + await log_action( + db, admin, get_client_ip(request), + action="update", resource_type="source", resource_id=None, + after={"bulk_approved_ids": approved_ids, "min_confidence": min_confidence}, + ) + return {"approved": len(approved_ids), "ids": approved_ids} + + +# --- PDF-Upload (Quelle vom Typ pdf_document) --- +# Speicherort relativ zur DB: /pdfs/{sha256}.pdf +# Der Monitor pollt pdf_document-Quellen mit processed_at IS NULL und +# extrahiert Text + Uebersetzungen (DE/EN). Dieser Endpoint legt nur die +# Datei + den Source-Eintrag an (kein LLM-Call hier). + +MAX_PDF_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB +PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(DB_PATH)), "pdfs") + + +def _pdf_dir() -> str: + os.makedirs(PDF_DIR, exist_ok=True) + return PDF_DIR + + +@router.post("/global/upload-pdf", status_code=201) +async def upload_pdf_source( + request: Request, + admin: dict = Depends(get_current_admin), + db: aiosqlite.Connection = Depends(db_dependency), + file: UploadFile = File(...), + name: Optional[str] = Form(None), + category: str = Form("sonstige"), + language: Optional[str] = Form(None), + notes: Optional[str] = Form(None), +): + """PDF hochladen + als Grundquelle (source_type=pdf_document) registrieren. + + Idempotent ueber SHA256: bestehender Eintrag wird zurueckgegeben (409 mit + Detail), die Datei wird nicht erneut gespeichert. + """ + # Magic-Bytes-Check (PDF beginnt mit %PDF-) + head = await file.read(8) + if not head.startswith(b"%PDF-"): + raise HTTPException(status_code=415, detail="Datei ist kein gueltiges PDF (Magic-Bytes fehlen)") + + # Datei streaming in Temp lesen + sha256 berechnen + Groesse pruefen + sha = hashlib.sha256() + sha.update(head) + total = len(head) + tmp_path = os.path.join(_pdf_dir(), f".upload-{uuid.uuid4().hex}.tmp") + try: + with open(tmp_path, "wb") as out: + out.write(head) + while True: + chunk = await file.read(1024 * 1024) + if not chunk: + break + total += len(chunk) + if total > MAX_PDF_SIZE_BYTES: + raise HTTPException(status_code=413, detail=f"PDF ueberschreitet Maximum von {MAX_PDF_SIZE_BYTES // 1024 // 1024} MB") + sha.update(chunk) + out.write(chunk) + sha_hex = sha.hexdigest() + final_path = os.path.join(_pdf_dir(), f"{sha_hex}.pdf") + rel_path = os.path.join("pdfs", f"{sha_hex}.pdf") + + # Duplikat-Check ueber sha256 + cursor = await db.execute( + "SELECT id, name FROM sources WHERE pdf_sha256 = ? AND tenant_id IS NULL", + (sha_hex,), + ) + existing = await cursor.fetchone() + if existing: + # Datei wegwerfen, bestehende Quelle zurueckgeben + os.unlink(tmp_path) + raise HTTPException( + status_code=409, + detail=f"PDF bereits hochgeladen als Quelle '{existing['name']}' (id={existing['id']})", + ) + + # Atomar umbenennen + if not os.path.exists(final_path): + os.replace(tmp_path, final_path) + else: + # Datei mit gleichem sha existiert physisch, aber keine Source -> wiederverwenden + os.unlink(tmp_path) + except HTTPException: + if os.path.exists(tmp_path): + try: os.unlink(tmp_path) + except OSError: pass + raise + except Exception as e: + if os.path.exists(tmp_path): + try: os.unlink(tmp_path) + except OSError: pass + logger.exception("PDF-Upload fehlgeschlagen") + raise HTTPException(status_code=500, detail=f"PDF-Upload fehlgeschlagen: {e}") + + # Name herleiten falls nicht angegeben + display_name = (name or "").strip() or re.sub(r"\.pdf$", "", file.filename or "PDF", flags=re.I) + display_name = display_name[:200] + + cursor = await db.execute( + """INSERT INTO sources + (name, url, domain, source_type, category, status, notes, language, + pdf_path, pdf_sha256, added_by, tenant_id) + VALUES (?, NULL, NULL, 'pdf_document', ?, 'active', ?, ?, ?, ?, ?, NULL)""", + (display_name, category, notes, language, rel_path, sha_hex, admin.get("email") or "system"), + ) + src_id = cursor.lastrowid + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (src_id,)) + new_src = dict(await cursor.fetchone()) + await log_action( + db, admin, get_client_ip(request), + action="upload_pdf", resource_type="source", resource_id=src_id, + after={"name": display_name, "pdf_sha256": sha_hex, "size_bytes": total}, + ) + return new_src diff --git a/src/shared/services/external_reputation.py b/src/shared/services/external_reputation.py new file mode 100644 index 0000000..de973b3 --- /dev/null +++ b/src/shared/services/external_reputation.py @@ -0,0 +1,282 @@ +"""Externe Reputations-Daten fuer Quellen. + +Synchronisiert Domain-Listen von oeffentlichen Reputations-/Faktencheck-Datenbanken +und schreibt die Treffer in die sources-Spalten: + +- IFCN-Signatories (anerkannte Faktenchecker) -> ifcn_signatory +- EUvsDisinfo (pro-Kreml-Desinformation, Zenodo-CSV) -> eu_disinfo_listed, + eu_disinfo_case_count, eu_disinfo_last_seen + +Anschliessend wendet apply_reputation_overrides() Override-Regeln auf die +reliability-Spalte an: +- ifcn_signatory=1 -> reliability='sehr_hoch' +- eu_disinfo_case_count >= 5 -> reliability='sehr_niedrig' +- eu_disinfo_case_count >= 1 -> reliability eine Stufe runter (max bis 'niedrig') +""" +import csv +import io +import logging +from collections import defaultdict +from urllib.parse import urlparse + +import aiosqlite +import httpx + +logger = logging.getLogger("osint.external_reputation") + +IFCN_LIST_URL = "https://raw.githubusercontent.com/IFCN/verified-signatories/main/list" +EU_DISINFO_CSV_URL = "https://zenodo.org/records/10514307/files/euvsdisinfo_base.csv?download=1" + +HTTP_TIMEOUT = httpx.Timeout(60.0, connect=10.0) + +# Generische Plattform-Domains, die NICHT als Quelle markiert werden duerfen +# (EUvsDisinfo aggregiert anonyme Telegram-/Twitter-Posts unter Plattform-Domains). +PLATFORM_DOMAINS = { + "t.me", "telegram.me", "telegram.org", + "twitter.com", "x.com", "mobile.twitter.com", + "youtube.com", "youtu.be", "m.youtube.com", + "facebook.com", "fb.com", "m.facebook.com", + "instagram.com", "tiktok.com", "vk.com", "ok.ru", + "rumble.com", "bitchute.com", "odysee.com", + "reddit.com", "old.reddit.com", + "wordpress.com", "blogspot.com", "medium.com", + "substack.com", "wixsite.com", +} + +# Reliability-Skala in Stufenfolge (schlecht -> gut) +RELIABILITY_ORDER = ["sehr_niedrig", "niedrig", "gemischt", "hoch", "sehr_hoch"] + + +def _normalize_domain(raw: str | None) -> str | None: + """Normalisiert eine Domain: lowercase, ohne www., ohne Schema/Pfad.""" + if not raw: + return None + raw = raw.strip().lower() + if not raw: + return None + # Falls eine vollstaendige URL uebergeben wurde + if "://" in raw: + try: + raw = urlparse(raw).netloc or raw + except ValueError: + pass + # Pfad/Query strippen + raw = raw.split("/")[0].split("?")[0].split("#")[0] + if raw.startswith("www."): + raw = raw[4:] + return raw or None + + +async def _fetch_text(url: str) -> str: + """Laedt Text von einer URL. Wirft HTTPException bei Fehler.""" + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.text + + +async def sync_ifcn_signatories(db: aiosqlite.Connection) -> dict: + """Laedt IFCN-Domain-Liste und matcht gegen sources.domain. + + Setzt ifcn_signatory=1 wo die Domain in der Liste vorkommt, sonst 0. + """ + text = await _fetch_text(IFCN_LIST_URL) + domains: set[str] = set() + for line in text.splitlines(): + d = _normalize_domain(line) + if d: + domains.add(d) + logger.info("IFCN-Liste geladen: %d Domains", len(domains)) + + # Aktuelle Quellen mit Domain laden + cursor = await db.execute( + "SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + matched_ids: list[int] = [] + unmatched_ids: list[int] = [] + for s in sources: + nd = _normalize_domain(s["domain"]) + if nd and nd not in PLATFORM_DOMAINS and nd in domains: + matched_ids.append(s["id"]) + else: + unmatched_ids.append(s["id"]) + + # Bulk-Update in zwei Statements + if matched_ids: + placeholders = ",".join("?" for _ in matched_ids) + await db.execute( + f"UPDATE sources SET ifcn_signatory = 1 WHERE id IN ({placeholders})", + matched_ids, + ) + if unmatched_ids: + placeholders = ",".join("?" for _ in unmatched_ids) + await db.execute( + f"UPDATE sources SET ifcn_signatory = 0 WHERE id IN ({placeholders})", + unmatched_ids, + ) + await db.commit() + logger.info("IFCN-Sync: %d Quellen als Faktenchecker markiert (von %d)", + len(matched_ids), len(sources)) + return { + "list_size": len(domains), + "sources_checked": len(sources), + "matched": len(matched_ids), + } + + +async def sync_eu_disinfo(db: aiosqlite.Connection) -> dict: + """Laedt EUvsDisinfo-CSV von Zenodo, aggregiert pro Domain, schreibt sources. + + - eu_disinfo_listed: 1 wenn Domain mindestens 1x als 'disinformation' debunkt + - eu_disinfo_case_count: Anzahl Disinformation-Faelle + - eu_disinfo_last_seen: spaetestes debunk_date + """ + text = await _fetch_text(EU_DISINFO_CSV_URL) + reader = csv.DictReader(io.StringIO(text)) + + # Per-Domain aggregieren (nur class='disinformation') + counts: dict[str, int] = defaultdict(int) + last_seen: dict[str, str] = {} + total_rows = 0 + for row in reader: + total_rows += 1 + if (row.get("class") or "").strip().lower() != "disinformation": + continue + d = _normalize_domain(row.get("article_domain")) + if not d: + continue + counts[d] += 1 + debunk_date = (row.get("debunk_date") or "").strip() + if debunk_date: + prev = last_seen.get(d) + if not prev or debunk_date > prev: + last_seen[d] = debunk_date + logger.info("EUvsDisinfo-CSV: %d Zeilen, %d Domains mit Desinformation", + total_rows, len(counts)) + + # Quellen laden + matchen + cursor = await db.execute( + "SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + matched = 0 + for s in sources: + nd = _normalize_domain(s["domain"]) + if nd and nd not in PLATFORM_DOMAINS and nd in counts: + await db.execute( + """UPDATE sources SET + eu_disinfo_listed = 1, + eu_disinfo_case_count = ?, + eu_disinfo_last_seen = ? + WHERE id = ?""", + (counts[nd], last_seen.get(nd), s["id"]), + ) + matched += 1 + else: + await db.execute( + """UPDATE sources SET + eu_disinfo_listed = 0, + eu_disinfo_case_count = 0, + eu_disinfo_last_seen = NULL + WHERE id = ?""", + (s["id"],), + ) + await db.commit() + logger.info("EUvsDisinfo-Sync: %d Quellen als Desinformations-Quelle markiert (von %d)", + matched, len(sources)) + return { + "rows_in_csv": total_rows, + "domains_with_disinfo_in_csv": len(counts), + "sources_checked": len(sources), + "matched": matched, + } + + +def _override_reliability(current: str | None, ifcn: bool, eu_count: int) -> str | None: + """Wendet Override-Regeln auf eine reliability-Stufe an. + + Rueckgabe: neue Stufe (oder None, wenn unveraendert). + """ + cur = current or "na" + + # IFCN gewinnt: zertifizierter Faktenchecker -> sehr_hoch (immer) + if ifcn: + return "sehr_hoch" if cur != "sehr_hoch" else None + + # EUvsDisinfo: Downgrade + if eu_count >= 5: + return "sehr_niedrig" if cur != "sehr_niedrig" else None + if eu_count >= 1: + # Eine Stufe runter, mindestens bis 'niedrig' + if cur == "na": + return "niedrig" + if cur in RELIABILITY_ORDER: + idx = RELIABILITY_ORDER.index(cur) + new_idx = max(0, idx - 1) + new = RELIABILITY_ORDER[new_idx] + # Mindeststufe 'niedrig' bei eu_count >= 1 + if RELIABILITY_ORDER.index(new) > RELIABILITY_ORDER.index("niedrig"): + new = "niedrig" + return new if new != cur else None + return None + + +async def apply_reputation_overrides(db: aiosqlite.Connection, source_id: int | None = None) -> dict: + """Wendet Reliability-Override-Regeln an. + + Wenn source_id angegeben ist, nur fuer diese Quelle. Sonst fuer alle Quellen. + """ + if source_id is not None: + cursor = await db.execute( + "SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count " + "FROM sources WHERE id = ?", + (source_id,), + ) + else: + cursor = await db.execute( + "SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count FROM sources" + ) + sources = [dict(r) for r in await cursor.fetchall()] + + changed = 0 + for s in sources: + new = _override_reliability( + s.get("reliability"), + bool(s.get("ifcn_signatory")), + int(s.get("eu_disinfo_case_count") or 0), + ) + if new is not None: + await db.execute( + "UPDATE sources SET reliability = ? WHERE id = ?", + (new, s["id"]), + ) + changed += 1 + await db.commit() + logger.info("Reliability-Override: %d Quellen angepasst (von %d gepruefte)", + changed, len(sources)) + return {"checked": len(sources), "changed": changed} + + +async def sync_all(db: aiosqlite.Connection) -> dict: + """Vollstaendiger Sync: IFCN + EUvsDisinfo + Reliability-Override. + + Setzt external_data_synced_at fuer alle Quellen. + """ + ifcn_result = await sync_ifcn_signatories(db) + eu_result = await sync_eu_disinfo(db) + override_result = await apply_reputation_overrides(db) + + await db.execute( + "UPDATE sources SET external_data_synced_at = CURRENT_TIMESTAMP " + "WHERE domain IS NOT NULL AND domain != ''" + ) + await db.commit() + + return { + "ifcn": ifcn_result, + "eu_disinfo": eu_result, + "override": override_result, + } diff --git a/src/shared/services/org_settings.py b/src/shared/services/org_settings.py new file mode 100644 index 0000000..d152b5d --- /dev/null +++ b/src/shared/services/org_settings.py @@ -0,0 +1,104 @@ +"""Organization-Settings-Helper. + +KV-Store pro Organisation. Aktuell genutzt fuer output_language ('de'|'en'). +Spaeter erweiterbar (Default-Modell, Telegram-Toggle, Theme, ...). + +Cache: TTL 60s in-memory pro (tenant_id, key). Wird bei set_org_setting() +invalidiert. +""" +import logging +import time +from typing import Optional + +import aiosqlite + +logger = logging.getLogger("osint.org_settings") + +_CACHE: dict[tuple[int, str], tuple[float, Optional[str]]] = {} +_TTL_SECONDS = 60.0 + + +def _cache_get(tenant_id: int, key: str) -> tuple[bool, Optional[str]]: + """(hit, value). hit=True heisst Cache traf; value kann auch None sein.""" + entry = _CACHE.get((tenant_id, key)) + if entry is None: + return (False, None) + expires_at, value = entry + if time.monotonic() > expires_at: + _CACHE.pop((tenant_id, key), None) + return (False, None) + return (True, value) + + +def _cache_put(tenant_id: int, key: str, value: Optional[str]) -> None: + _CACHE[(tenant_id, key)] = (time.monotonic() + _TTL_SECONDS, value) + + +def _cache_invalidate(tenant_id: int, key: str) -> None: + _CACHE.pop((tenant_id, key), None) + + +async def get_org_setting( + db: aiosqlite.Connection, + tenant_id: int, + key: str, + default: Optional[str] = None, +) -> Optional[str]: + """Liest ein Org-Setting. Fallback auf default.""" + if tenant_id is None: + return default + hit, cached = _cache_get(tenant_id, key) + if hit: + return cached if cached is not None else default + cursor = await db.execute( + "SELECT value FROM organization_settings WHERE organization_id = ? AND key = ?", + (tenant_id, key), + ) + row = await cursor.fetchone() + value = row["value"] if row else None + _cache_put(tenant_id, key, value) + return value if value is not None else default + + +async def set_org_setting( + db: aiosqlite.Connection, + tenant_id: int, + key: str, + value: str, +) -> None: + """Setzt ein Org-Setting (upsert).""" + await db.execute( + """INSERT INTO organization_settings (organization_id, key, value, updated_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(organization_id, key) DO UPDATE SET + value = excluded.value, + updated_at = CURRENT_TIMESTAMP""", + (tenant_id, key, value), + ) + await db.commit() + _cache_invalidate(tenant_id, key) + logger.info("Org %s Setting %s='%s' gespeichert", tenant_id, key, value) + + +# Bekannte Sprachen + Anzeigenamen fuer Prompts +LANGUAGE_DISPLAY_NAMES = { + "de": "Deutsch", + "en": "English", +} + + +async def get_org_language( + db: aiosqlite.Connection, + tenant_id: int, +) -> str: + """Liefert ISO-2-Sprachcode der Org (default 'de').""" + value = await get_org_setting(db, tenant_id, "output_language", default="de") + if value not in LANGUAGE_DISPLAY_NAMES: + logger.warning("Unbekannte output_language '%s' fuer Org %s -- fallback 'de'", value, tenant_id) + return "de" + return value + + +def language_display(lang_iso: str) -> str: + """ISO-Code -> Anzeigename fuer Prompts ('de' -> 'Deutsch').""" + return LANGUAGE_DISPLAY_NAMES.get(lang_iso, lang_iso) diff --git a/src/shared/services/source_classifier.py b/src/shared/services/source_classifier.py new file mode 100644 index 0000000..15d6cd2 --- /dev/null +++ b/src/shared/services/source_classifier.py @@ -0,0 +1,295 @@ +"""Klassifiziert Quellen via Claude (Haiku) nach 4 Achsen + state_affiliated + country. + +Schreibt Vorschlaege in die proposed_*-Spalten von sources und setzt +classification_source='llm_pending'. Approval erfolgt ueber separate Endpoints, +die proposed_* in die echten Spalten kopieren. +""" +import asyncio +import json +import logging +import re + +import aiosqlite + +from shared.agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_classifier") + +POLITICAL_VALUES = { + "links_extrem", "links", "mitte_links", "liberal", "mitte", + "konservativ", "mitte_rechts", "rechts", "rechts_extrem", "na", +} +MEDIA_TYPE_VALUES = { + "tageszeitung", "wochenzeitung", "magazin", "tv_sender", "radio", + "oeffentlich_rechtlich", "nachrichtenagentur", "online_only", "blog", + "telegram_kanal", "telegram_bot", "podcast", "social_media", "imageboard", + "think_tank", "ngo", "behoerde", "staatsmedium", "fachmedium", "sonstige", +} +RELIABILITY_VALUES = {"sehr_hoch", "hoch", "gemischt", "niedrig", "sehr_niedrig", "na"} +ALIGNMENT_VALUES = { + "prorussisch", "proiranisch", "prowestlich", "proukrainisch", + "prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch", + "protuerkisch", "panarabisch", "neutral", "sonstige", +} + + +def _build_prompt(src: dict, sample_articles: list[dict]) -> str: + sample_text = "" + if sample_articles: + lines = [] + for i, art in enumerate(sample_articles[:5], 1): + headline = (art.get("headline") or art.get("headline_de") or "").strip() + if headline: + lines.append(f"{i}. {headline[:200]}") + if lines: + sample_text = "\nLetzte Artikel/Headlines:\n" + "\n".join(lines) + + return f"""Du bist ein OSINT-Analyst und klassifizierst Nachrichten- und Medienquellen fuer ein Lagebild-Monitoring-System (DACH-Raum). + +QUELLE: +Name: {src.get('name')} +URL: {src.get('url') or '-'} +Domain: {src.get('domain') or '-'} +Quellentyp: {src.get('source_type')} +Bisherige Kategorie: {src.get('category')} +Sprache: {src.get('language') or 'unbekannt'} +Bisherige Notiz (Freitext): {src.get('bias') or '-'}{sample_text} + +AUFGABE: Klassifiziere die Quelle nach folgenden Achsen. + +1. political_orientation: + - links_extrem (z.B. linksunten.indymedia) + - links (klar links, z.B. junge Welt, taz) + - mitte_links (linksliberal/sozialdemokratisch, z.B. SZ, Spiegel) + - liberal (wirtschafts-/grünliberal, z.B. NZZ, Zeit) + - mitte (politisch neutral, Agentur, z.B. dpa, Reuters, tagesschau) + - konservativ (buergerlich-konservativ, z.B. FAZ, Welt) + - mitte_rechts (rechts-buergerlich, z.B. Tichys Einblick, Achgut) + - rechts (klar rechts, z.B. Junge Freiheit, EpochTimes) + - rechts_extrem (z.B. Compact, PI-News) + - na (nicht klassifizierbar: Behoerde, Fachmedium, Think Tank ohne klare politische Linie) + +2. media_type (genau einer): + tageszeitung, wochenzeitung, magazin, tv_sender, radio, oeffentlich_rechtlich, + nachrichtenagentur, online_only, blog, telegram_kanal, telegram_bot, podcast, + social_media, imageboard, think_tank, ngo, behoerde, staatsmedium, fachmedium, sonstige + +3. reliability: + - sehr_hoch (etablierte Qualitaet, Faktencheck: tagesschau, dpa, FAZ, Reuters) + - hoch (serioes mit gelegentlichen Schwaechen: taz, Welt, BILD bei harten News) + - gemischt (Mix Meinung/Einseitigkeit: Tichys Einblick, Achgut, Boulevard) + - niedrig (haeufig irrefuehrend, schwache Quellenarbeit: Junge Freiheit, EpochTimes) + - sehr_niedrig (bekannt fuer Desinformation/Verschwoerung: Compact, RT, Sputnik, PI-News) + - na (nicht bewertbar) + +4. alignments (Mehrfach, leeres Array wenn keine ausgepraegte Naehe): + prorussisch, proiranisch, prowestlich, proukrainisch, prochinesisch, projapanisch, + proisraelisch, propalaestinensisch, protuerkisch, panarabisch, neutral, sonstige + +5. state_affiliated (true/false): true wenn vom Staat finanziert/kontrolliert + (RT, Sputnik, CGTN, PressTV, Xinhua, TRT). Public Service Broadcaster + wie ARD/ZDF/BBC sind NICHT state_affiliated. + +6. country_code (ISO 3166-1 alpha-2): Heimatland (DE, AT, CH, RU, US, ...). null wenn unklar. + +7. confidence (0.0-1.0): 0.85+ fuer bekannte Outlets, 0.5-0.85 fuer mittelbekannt, <0.5 fuer unsicher. + +8. reasoning (1-2 Saetze): Kurze Begruendung der Hauptklassifikationen. + +WICHTIG: +- Antworte AUSSCHLIESSLICH mit einem JSON-Objekt, kein Text drumherum. +- Nutze ausschliesslich die genannten enum-Werte (snake_case). +- Bei Unklarheit lieber `na` und niedrige confidence. + +JSON-Schema: +{{ + "political_orientation": "...", + "media_type": "...", + "reliability": "...", + "alignments": ["..."], + "state_affiliated": false, + "country_code": "DE", + "confidence": 0.9, + "reasoning": "..." +}}""" + + +async def _load_sample_articles(db: aiosqlite.Connection, name: str, domain: str | None, limit: int = 5) -> list[dict]: + """Laedt die letzten Headlines einer Quelle (per name oder Domain-Match).""" + rows: list = [] + if name: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source = ? ORDER BY collected_at DESC LIMIT ?", + (name, limit), + ) + rows = await cursor.fetchall() + if not rows and domain: + cursor = await db.execute( + "SELECT headline, headline_de FROM articles WHERE source_url LIKE ? ORDER BY collected_at DESC LIMIT ?", + (f"%{domain}%", limit), + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + +def _validate(parsed: dict) -> dict: + """Validiert + normalisiert eine LLM-Antwort gegen die Enums.""" + pol = parsed.get("political_orientation", "na") + if pol not in POLITICAL_VALUES: + pol = "na" + mt = parsed.get("media_type", "sonstige") + if mt not in MEDIA_TYPE_VALUES: + mt = "sonstige" + rel = parsed.get("reliability", "na") + if rel not in RELIABILITY_VALUES: + rel = "na" + aligns_raw = parsed.get("alignments") or [] + if not isinstance(aligns_raw, list): + aligns_raw = [] + aligns = sorted({a for a in aligns_raw if isinstance(a, str) and a in ALIGNMENT_VALUES}) + sa = bool(parsed.get("state_affiliated", False)) + cc = parsed.get("country_code") + if isinstance(cc, str) and len(cc) == 2 and cc.isalpha(): + cc = cc.upper() + else: + cc = None + try: + confidence = float(parsed.get("confidence", 0.5)) + confidence = max(0.0, min(1.0, confidence)) + except (TypeError, ValueError): + confidence = 0.5 + reasoning = str(parsed.get("reasoning", ""))[:1000] + return { + "political_orientation": pol, + "media_type": mt, + "reliability": rel, + "alignments": aligns, + "state_affiliated": sa, + "country_code": cc, + "confidence": confidence, + "reasoning": reasoning, + } + + +async def classify_source( + db: aiosqlite.Connection, + source_id: int, + sample_limit: int = 5, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert eine einzelne Quelle und schreibt die Vorschlaege in proposed_*-Spalten.""" + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, language, bias, " + "classification_source FROM sources WHERE id = ?", + (source_id,), + ) + row = await cursor.fetchone() + if not row: + raise ValueError(f"Quelle {source_id} nicht gefunden") + src = dict(row) + + sample = await _load_sample_articles(db, src["name"], src.get("domain"), sample_limit) + prompt = _build_prompt(src, sample) + response, usage = await call_claude(prompt, tools=None, model=model) + + json_match = re.search(r"\{.*\}", response, re.DOTALL) + if not json_match: + raise ValueError(f"Keine JSON-Antwort von Claude fuer source_id={source_id}: {response[:200]}") + parsed = json.loads(json_match.group(0)) + result = _validate(parsed) + + # Nur classification_source auf 'llm_pending' setzen, wenn nicht bereits manuell/approved + new_src = "CASE WHEN classification_source IN ('manual','llm_approved') THEN classification_source ELSE 'llm_pending' END" + await db.execute( + f"""UPDATE sources SET + proposed_political_orientation = ?, + proposed_media_type = ?, + proposed_reliability = ?, + proposed_state_affiliated = ?, + proposed_country_code = ?, + proposed_alignments_json = ?, + proposed_confidence = ?, + proposed_reasoning = ?, + proposed_at = CURRENT_TIMESTAMP, + classification_source = {new_src} + WHERE id = ?""", + ( + result["political_orientation"], + result["media_type"], + result["reliability"], + 1 if result["state_affiliated"] else 0, + result["country_code"], + json.dumps(result["alignments"], ensure_ascii=False), + result["confidence"], + result["reasoning"], + source_id, + ), + ) + await db.commit() + + logger.info( + "Klassifiziert source_id=%s '%s' -> %s/%s/%s conf=%.2f ($%.4f)", + source_id, src["name"], result["political_orientation"], + result["media_type"], result["reliability"], result["confidence"], + usage.cost_usd, + ) + + result["source_id"] = source_id + result["usage"] = { + "cost_usd": usage.cost_usd, + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } + return result + + +async def bulk_classify( + db: aiosqlite.Connection, + limit: int = 50, + only_unclassified: bool = True, + model: str = CLAUDE_MODEL_FAST, +) -> dict: + """Klassifiziert noch unklassifizierte Quellen (sequenziell). + + Args: + limit: Maximale Anzahl Quellen pro Aufruf + only_unclassified: Wenn True, nur classification_source='legacy'. + Wenn False, auch 'llm_pending' neu klassifizieren. + """ + if only_unclassified: + where = "classification_source = 'legacy'" + else: + where = "classification_source IN ('legacy', 'llm_pending')" + cursor = await db.execute( + f"SELECT id FROM sources WHERE {where} AND status = 'active' " + f"AND source_type != 'excluded' ORDER BY id LIMIT ?", + (limit,), + ) + ids = [row["id"] for row in await cursor.fetchall()] + + total_cost = 0.0 + success = 0 + errors: list[dict] = [] + + for sid in ids: + try: + r = await classify_source(db, sid, model=model) + total_cost += r["usage"]["cost_usd"] + success += 1 + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("Klassifikation source_id=%s fehlgeschlagen: %s", sid, e, exc_info=True) + errors.append({"source_id": sid, "error": str(e)}) + + logger.info( + "Bulk-Klassifikation fertig: %d/%d erfolgreich, $%.4f Kosten, %d Fehler", + success, len(ids), total_cost, len(errors), + ) + return { + "processed": len(ids), + "success": success, + "errors": errors, + "total_cost_usd": total_cost, + } diff --git a/src/static/css/style.css b/src/static/css/style.css index fc102ba..ecf1ca6 100644 --- a/src/static/css/style.css +++ b/src/static/css/style.css @@ -962,3 +962,162 @@ input[type="date"].filter-select { padding: 6px 10px; } font-weight: 400; } +/* === Klassifikations-Review === */ +.sources-tab-badge { + display: inline-flex; + align-items: center; + justify-content: center; + min-width: 20px; + padding: 0 6px; + height: 18px; + border-radius: 9px; + background: var(--accent); + color: var(--bg-primary); + font-size: 10px; + font-weight: 700; +} + +.review-toolbar { + display: flex; + align-items: center; + justify-content: space-between; + padding: 10px 14px; + background: var(--bg-secondary); + border: 1px solid var(--border); + border-radius: var(--radius); + margin-bottom: 12px; + flex-wrap: wrap; + gap: 12px; +} +.review-toolbar-info { + display: flex; + align-items: center; + gap: 16px; + font-size: 13px; + color: var(--text-primary); +} +.review-conf-filter { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 12px; + color: var(--text-secondary); +} +.review-toolbar-actions { display: flex; gap: 6px; } + +.review-list { display: flex; flex-direction: column; gap: 8px; } +.review-card { + background: var(--bg-secondary); + border: 1px solid var(--border); + border-radius: var(--radius); + padding: 12px 14px; +} +.review-card-header { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 12px; + margin-bottom: 10px; +} +.review-card-title { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 8px; +} +.review-card-name { font-weight: 600; font-size: 14px; color: var(--text-primary); } +.review-card-domain { font-size: 11px; color: var(--text-muted); } +.review-global-badge { + display: inline-flex; + align-items: center; + padding: 1px 6px; + border-radius: var(--radius); + background: #5e35b1; + color: #fff; + font-size: 9px; + font-weight: 600; + letter-spacing: 0.3px; + text-transform: uppercase; +} +.review-card-confidence { + display: inline-flex; + flex-direction: column; + align-items: center; + padding: 4px 10px; + border-radius: var(--radius); + min-width: 60px; +} +.review-card-confidence .conf-value { font-size: 14px; font-weight: 700; } +.review-card-confidence .conf-label { font-size: 9px; text-transform: uppercase; letter-spacing: 0.3px; opacity: 0.8; } +.review-card-confidence.conf-high { background: rgba(34,197,94,0.15); color: var(--success); } +.review-card-confidence.conf-medium { background: rgba(245,158,11,0.15); color: var(--warning); } +.review-card-confidence.conf-low { background: rgba(239,68,68,0.15); color: var(--danger); } + +.review-card-diff { + display: grid; + grid-template-columns: 1fr; + gap: 4px; + font-size: 12px; + margin-bottom: 10px; +} +.review-diff-row { + display: grid; + grid-template-columns: 130px 1fr 24px 1fr; + align-items: center; + gap: 8px; + padding: 3px 6px; + border-radius: 3px; +} +.review-diff-row.changed { background: rgba(245,158,11,0.10); } +.review-diff-label { color: var(--text-secondary); font-weight: 500; } +.review-diff-current { color: var(--text-muted); } +.review-diff-arrow { text-align: center; color: var(--text-muted); font-weight: 600; } +.review-diff-proposed { color: var(--text-primary); font-weight: 500; } +.review-diff-row.changed .review-diff-proposed { color: var(--warning); font-weight: 600; } + +.review-card-reasoning { + font-size: 12px; + color: var(--text-secondary); + background: var(--bg-tertiary); + padding: 8px 10px; + border-radius: var(--radius); + margin-bottom: 10px; + line-height: 1.5; +} +.review-card-actions { display: flex; gap: 6px; flex-wrap: wrap; } + +/* Edit-Form: Klassifikations-Sektion */ +.sources-classification-section { + margin-top: 14px; + padding-top: 14px; + border-top: 1px solid var(--border); +} +.sources-classification-header { + font-size: 12px; + font-weight: 600; + color: var(--text-secondary); + margin-bottom: 10px; + letter-spacing: 0.3px; + text-transform: uppercase; +} +.alignment-chips { display: flex; flex-wrap: wrap; gap: 6px; } +.alignment-chip { + display: inline-flex; + align-items: center; + padding: 4px 10px; + border-radius: 999px; + font-size: 11px; + font-weight: 500; + background: transparent; + color: var(--text-secondary); + border: 1px solid var(--border); + cursor: pointer; + transition: all 0.12s ease; +} +.alignment-chip:hover { background: var(--bg-tertiary); color: var(--text-primary); } +.alignment-chip.active { + background: var(--accent); + color: var(--bg-primary); + border-color: var(--accent); +} + diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 4bd8138..f40effd 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -166,6 +166,14 @@ +
+ + + Bestimmt die Ausgabesprache der KI (Lagebild, Faktencheck, Recherche) und der sichtbarsten UI-Elemente fuer alle Nutzer dieser Organisation. +
@@ -294,6 +302,7 @@ +
@@ -319,6 +328,7 @@ +
@@ -406,6 +416,35 @@
+ +
+
+
+ 0 Vorschläge ausstehend + +
+
+ + + +
+
+
+
+
Lade Review-Queue…
+
+
+
+ @@ -469,6 +508,14 @@ +
+ + + Steuert die Ausgabesprache der KI-Pipeline (Lagebild, Faktencheck, Recherche) und die sichtbarsten UI-Strings im Monitor. +
@@ -653,6 +701,96 @@
+ +
+
Einordnung (Klassifikation)
+
+
+ + +
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ +
+
+
+ +
+ + + + + + + + + + + + +
+
+
+ + + + - + diff --git a/src/static/js/app.js b/src/static/js/app.js index a597f3a..9b8cc46 100644 --- a/src/static/js/app.js +++ b/src/static/js/app.js @@ -26,6 +26,23 @@ const API = { post(path, body) { return this.request(path, { method: "POST", body: JSON.stringify(body) }); }, put(path, body) { return this.request(path, { method: "PUT", body: body ? JSON.stringify(body) : undefined }); }, del(path) { return this.request(path, { method: "DELETE" }); }, + + async upload(path, formData) { + const headers = {}; + if (this.token) headers["Authorization"] = `Bearer ${this.token}`; + const res = await fetch(path, { method: "POST", headers, body: formData }); + if (res.status === 401) { + localStorage.removeItem("token"); + localStorage.removeItem("username"); + window.location.href = "/"; + return; + } + if (!res.ok) { + const data = await res.json().catch(() => ({})); + throw new Error(data.detail || `Fehler ${res.status}`); + } + return res.json(); + }, }; // --- State --- @@ -213,6 +230,8 @@ async function openOrg(orgId) { document.getElementById("editOrgName").value = org.name; document.getElementById("editOrgActive").value = org.is_active ? "true" : "false"; + const langEl = document.getElementById("editOrgLanguage"); + if (langEl) langEl.value = org.output_language || "de"; loadOrgUsers(orgId); loadOrgLicenses(orgId); @@ -424,6 +443,7 @@ function setupForms() { await API.post("/api/orgs", { name: document.getElementById("newOrgName").value, slug: document.getElementById("newOrgSlug").value, + output_language: document.getElementById("newOrgLanguage").value || "de", }); closeModal("modalNewOrg"); document.getElementById("newOrgForm").reset(); @@ -518,6 +538,7 @@ function setupForms() { await API.put(`/api/orgs/${currentOrgId}`, { name: document.getElementById("editOrgName").value, is_active: document.getElementById("editOrgActive").value === "true", + output_language: document.getElementById("editOrgLanguage").value || "de", }); openOrg(currentOrgId); loadOrgs(); diff --git a/src/static/js/sources.js b/src/static/js/sources.js index c4e9ae0..7c51850 100644 --- a/src/static/js/sources.js +++ b/src/static/js/sources.js @@ -37,6 +37,7 @@ function setupSourceSubTabs() { if (subtab === "global-sources") loadGlobalSources(); else if (subtab === "tenant-sources") loadTenantSources(); else if (subtab === "source-health") loadHealthData(); + else if (subtab === "classification-review") loadClassificationQueue(); }); }); } @@ -280,6 +281,7 @@ function openNewGlobalSource() { editingSourceId = null; document.getElementById("sourceModalTitle").textContent = "Neue Grundquelle"; document.getElementById("sourceForm").reset(); + setAlignmentChips([]); openModal("modalSource"); } @@ -298,11 +300,19 @@ function editGlobalSource(id) { document.getElementById("sourceLanguage").value = s.language || ""; document.getElementById("sourceBias").value = s.bias || ""; document.getElementById("sourceFetchStrategy").value = s.fetch_strategy || "default"; + document.getElementById("sourcePolitical").value = s.political_orientation || ""; + document.getElementById("sourceMediaType").value = s.media_type || ""; + document.getElementById("sourceReliability").value = s.reliability || ""; + document.getElementById("sourceCountryCode").value = s.country_code || ""; + document.getElementById("sourceStateAffiliated").checked = !!s.state_affiliated; + setAlignmentChips(s.alignments || []); openModal("modalSource"); } function setupSourceForms() { document.getElementById("newGlobalSourceBtn").addEventListener("click", openNewGlobalSource); + document.getElementById("newPdfSourceBtn")?.addEventListener("click", openPdfUploadModal); + setupPdfUploadForm(); document.getElementById("discoverSourceBtn").addEventListener("click", () => { document.getElementById("discoverUrl").value = ""; document.getElementById("discoverStatus").style.display = "none"; @@ -328,6 +338,19 @@ function setupSourceForms() { fetch_strategy: document.getElementById("sourceFetchStrategy").value || "default", }; + const pol = document.getElementById("sourcePolitical")?.value; + if (pol) body.political_orientation = pol; + const mt = document.getElementById("sourceMediaType")?.value; + if (mt) body.media_type = mt; + const rel = document.getElementById("sourceReliability")?.value; + if (rel) body.reliability = rel; + const cc = (document.getElementById("sourceCountryCode")?.value || "").trim().toUpperCase(); + if (cc) body.country_code = cc; + if (editingSourceId) { + body.state_affiliated = !!document.getElementById("sourceStateAffiliated")?.checked; + body.alignments = getAlignmentChips(); + } + try { if (editingSourceId) { await API.put("/api/sources/global/" + editingSourceId, body); @@ -641,6 +664,213 @@ async function addDiscoveredFeeds() { } } + +// === Klassifikations-Review === + +const POLITICAL_LABELS = { + links_extrem: { short: "L+", full: "Links (extrem)" }, + links: { short: "L", full: "Links" }, + mitte_links: { short: "ML", full: "Mitte-Links" }, + liberal: { short: "LIB", full: "Liberal" }, + mitte: { short: "M", full: "Mitte" }, + konservativ: { short: "KON", full: "Konservativ" }, + mitte_rechts: { short: "MR", full: "Mitte-Rechts" }, + rechts: { short: "R", full: "Rechts" }, + rechts_extrem: { short: "R+", full: "Rechts (extrem)" }, + na: { short: "?", full: "Nicht eingeordnet" }, +}; +const RELIABILITY_LABELS = { + sehr_hoch: "Sehr hoch", hoch: "Hoch", gemischt: "Gemischt", + niedrig: "Niedrig", sehr_niedrig: "Sehr niedrig", na: "Nicht eingeordnet", +}; +const MEDIA_TYPE_LABELS = { + tageszeitung: "Tageszeitung", wochenzeitung: "Wochenzeitung", magazin: "Magazin", + tv_sender: "TV-Sender", radio: "Radio", oeffentlich_rechtlich: "Öffentlich-Rechtlich", + nachrichtenagentur: "Nachrichtenagentur", online_only: "Online-only", blog: "Blog", + telegram_kanal: "Telegram-Kanal", telegram_bot: "Telegram-Bot", podcast: "Podcast", + social_media: "Social Media", imageboard: "Imageboard", think_tank: "Think Tank", + ngo: "NGO", behoerde: "Behörde", staatsmedium: "Staatsmedium", + fachmedium: "Fachmedium", sonstige: "Sonstige", +}; +const ALIGNMENT_LABELS = { + prorussisch: "prorussisch", proiranisch: "proiranisch", prowestlich: "prowestlich", + proukrainisch: "proukrainisch", prochinesisch: "prochinesisch", projapanisch: "projapanisch", + proisraelisch: "proisraelisch", propalaestinensisch: "propalästinensisch", + protuerkisch: "protürkisch", panarabisch: "panarabisch", neutral: "neutral", sonstige: "sonstige", +}; + +function setAlignmentChips(active) { + const chips = document.querySelectorAll("#sourceAlignmentChips .alignment-chip"); + const set = new Set((active || []).map((a) => (a || "").toLowerCase())); + chips.forEach((chip) => { + if (set.has(chip.dataset.alignment)) chip.classList.add("active"); + else chip.classList.remove("active"); + }); +} + +function getAlignmentChips() { + return Array.from(document.querySelectorAll("#sourceAlignmentChips .alignment-chip.active")) + .map((chip) => chip.dataset.alignment); +} + +function handleAlignmentChipClick(e) { + const chip = e.target.closest(".alignment-chip"); + if (!chip) return; + e.preventDefault(); + chip.classList.toggle("active"); +} + +async function refreshClassificationStats() { + try { + const stats = await API.get("/api/sources/classification/stats"); + const badge = document.getElementById("classificationPendingBadge"); + if (badge) badge.textContent = String(stats.pending_review || 0); + } catch (_) { /* still ok */ } +} + +async function loadClassificationQueue() { + const list = document.getElementById("classificationReviewList"); + if (!list) return; + const minConf = parseFloat(document.getElementById("reviewMinConfidence")?.value || "0"); + list.innerHTML = '
Lade…
'; + try { + const items = await API.get(`/api/sources/classification/queue?limit=200&min_confidence=${minConf}`); + const countEl = document.getElementById("reviewPendingCount"); + if (countEl) countEl.textContent = String(items.length); + refreshClassificationStats(); + if (items.length === 0) { + list.innerHTML = '
Keine ausstehenden Vorschläge.
'; + return; + } + list.innerHTML = items.map((it) => renderClassificationQueueItem(it)).join(""); + } catch (err) { + list.innerHTML = `
Fehler: ${esc(err.message)}
`; + } +} + +function renderClassificationQueueItem(item) { + const cur = item.current || {}; + const prop = item.proposed || {}; + const conf = prop.confidence || 0; + const confPct = Math.round(conf * 100); + const confClass = conf >= 0.85 ? "high" : conf >= 0.7 ? "medium" : "low"; + + const polFmt = (v) => (v && v !== "na" ? POLITICAL_LABELS[v]?.full || v : "–"); + const mtFmt = (v) => (v ? MEDIA_TYPE_LABELS[v] || v : "–"); + const relFmt = (v) => (v && v !== "na" ? RELIABILITY_LABELS[v] || v : "–"); + const stateFmt = (v) => (v ? "ja" : "nein"); + const ccFmt = (v) => v || "–"; + const alignFmt = (v) => + Array.isArray(v) && v.length > 0 ? v.map((a) => ALIGNMENT_LABELS[a] || a).join(", ") : "–"; + + const row = (label, c, p, fmt) => { + const cs = fmt(c); + const ps = fmt(p); + const changed = cs !== ps; + return `
+ ${esc(label)} + ${esc(cs)} + + ${esc(ps)} +
`; + }; + + const reasoning = prop.reasoning ? esc(prop.reasoning) : ""; + + return `
+
+
+ ${esc(item.name)} + ${item.is_global ? 'Grundquelle' : ""} + ${esc(item.domain || "")} +
+
+ ${confPct}% + Konfidenz +
+
+
+ ${row("Politik", cur.political_orientation, prop.political_orientation, polFmt)} + ${row("Medientyp", cur.media_type, prop.media_type, mtFmt)} + ${row("Glaubwürdigkeit", cur.reliability, prop.reliability, relFmt)} + ${row("Staatsnah", cur.state_affiliated, prop.state_affiliated, stateFmt)} + ${row("Land", cur.country_code, prop.country_code, ccFmt)} + ${row("Geopol. Nähe", cur.alignments, prop.alignments, alignFmt)} +
+ ${reasoning ? `
Begründung: ${reasoning}
` : ""} +
+ + + +
+
`; +} + +async function approveClassification(id) { + try { + await API.post(`/api/sources/${id}/classification/approve`, {}); + showToast("Klassifikation übernommen.", "success"); + loadClassificationQueue(); + } catch (err) { + showToast("Approve fehlgeschlagen: " + err.message, "error"); + } +} + +async function rejectClassification(id) { + try { + await API.post(`/api/sources/${id}/classification/reject`, {}); + showToast("Vorschlag verworfen.", "success"); + loadClassificationQueue(); + } catch (err) { + showToast("Reject fehlgeschlagen: " + err.message, "error"); + } +} + +async function reclassifySource(id) { + const btn = document.querySelector(`[data-reclassify-id="${id}"]`); + if (btn) { btn.disabled = true; btn.textContent = "..."; } + try { + await API.post(`/api/sources/${id}/classification/reclassify`, {}); + showToast("Neu klassifiziert.", "success"); + loadClassificationQueue(); + } catch (err) { + showToast("Reclassify fehlgeschlagen: " + err.message, "error"); + } finally { + if (btn) { btn.disabled = false; btn.textContent = "Neu klassifizieren"; } + } +} + +async function triggerBulkClassify() { + if (!confirm("Bulk-Klassifikation aller noch nicht klassifizierten Quellen starten? Läuft im Hintergrund (~3-5 Sek pro Quelle, ~0.02 USD pro Quelle).")) return; + try { + const r = await API.post("/api/sources/classification/bulk-classify?limit=500&only_unclassified=true", {}); + showToast(`Bulk-Klassifikation gestartet (limit=${r.limit}). In ~10 min neu laden.`, "info"); + } catch (err) { + showToast("Start fehlgeschlagen: " + err.message, "error"); + } +} + +async function bulkApproveHighConfidence() { + if (!confirm("Alle Vorschläge mit Konfidenz ≥ 0.85 genehmigen?")) return; + try { + const r = await API.post("/api/sources/classification/bulk-approve?min_confidence=0.85", {}); + showToast(`${r.approved} Vorschläge übernommen.`, "success"); + loadClassificationQueue(); + } catch (err) { + showToast("Bulk-Approve fehlgeschlagen: " + err.message, "error"); + } +} + +async function triggerExternalReputationSync() { + if (!confirm("IFCN- und EUvsDisinfo-Datenbanken jetzt syncen? Läuft im Hintergrund (~30 Sek).")) return; + try { + await API.post("/api/sources/external-reputation/sync", {}); + showToast("Externer Sync gestartet. Quellenliste in ~30 Sek neu laden.", "info"); + } catch (err) { + showToast("Sync fehlgeschlagen: " + err.message, "error"); + } +} + function toggleSourceInfo(id) { const row = document.getElementById("notes-" + id); if (!row) return; @@ -652,3 +882,68 @@ function toggleSourceInfo(id) { if (btn) btn.classList.toggle("active", !isVisible); } } + +// --- PDF-Quellen-Upload --- +function openPdfUploadModal() { + const form = document.getElementById("pdfUploadForm"); + if (form) form.reset(); + const err = document.getElementById("pdfUploadError"); + if (err) { err.style.display = "none"; err.textContent = ""; } + const prog = document.getElementById("pdfUploadProgress"); + if (prog) prog.style.display = "none"; + openModal("modalPdfUpload"); +} + +function setupPdfUploadForm() { + const form = document.getElementById("pdfUploadForm"); + if (!form || form.dataset.bound === "1") return; + form.dataset.bound = "1"; + + form.addEventListener("submit", async (e) => { + e.preventDefault(); + const errEl = document.getElementById("pdfUploadError"); + const progEl = document.getElementById("pdfUploadProgress"); + const submitBtn = document.getElementById("pdfUploadSubmitBtn"); + errEl.style.display = "none"; + + const fileInput = document.getElementById("pdfFile"); + const f = fileInput?.files?.[0]; + if (!f) { + errEl.textContent = "Bitte eine PDF-Datei auswaehlen."; + errEl.style.display = "block"; + return; + } + if (f.size > 50 * 1024 * 1024) { + errEl.textContent = "Datei ueberschreitet 50 MB."; + errEl.style.display = "block"; + return; + } + + const fd = new FormData(); + fd.append("file", f); + const nm = document.getElementById("pdfName").value.trim(); + if (nm) fd.append("name", nm); + fd.append("category", document.getElementById("pdfCategory").value || "sonstige"); + const lng = document.getElementById("pdfLanguage").value.trim(); + if (lng) fd.append("language", lng); + const nt = document.getElementById("pdfNotes").value.trim(); + if (nt) fd.append("notes", nt); + + submitBtn.disabled = true; + progEl.style.display = "block"; + try { + await API.upload("/api/sources/global/upload-pdf", fd); + closeModal("modalPdfUpload"); + if (typeof showToast === "function") { + showToast("PDF hochgeladen -- Verarbeitung laeuft im Hintergrund", "success"); + } + loadGlobalSources(); + } catch (err) { + errEl.textContent = err.message || "Upload fehlgeschlagen"; + errEl.style.display = "block"; + } finally { + submitBtn.disabled = false; + progEl.style.display = "none"; + } + }); +}