feat(klassifikation): Quellen-Klassifikation aus Monitor in Verwaltung verschoben

Service-Module (source_classifier, external_reputation) liegen jetzt in shared/services/, Endpoints unter /api/sources/classification/* sind hier statt im Monitor:
- classification/{stats,queue,bulk-classify,bulk-approve}
- {id}/classification/{approve,reject,reclassify}
- external-reputation/sync

modalSource erweitert um Klassifikations-Section (Politik, Medientyp, Reliability, state-affiliated, Land, 12 Alignment-Chips). Neuer Sub-Tab Klassifikation mit Review-Queue, Pending-Counter, Bulk-Actions. Auth via get_current_admin, Audit-Logging.

Begleit-Refactor: Monitor verliert die Klassifikations-UI/-Endpoints separat.
Dieser Commit ist enthalten in:
claude-dev
2026-05-09 21:27:55 +00:00
Ursprung 2f7d967ce2
Commit 015255237a
6 geänderte Dateien mit 1513 neuen und 13 gelöschten Zeilen

Datei anzeigen

@@ -1,15 +1,16 @@
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
import json
import logging
import uuid
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import aiosqlite
from auth import get_current_admin
from database import db_dependency
from database import db_dependency, get_db
from audit import log_action, get_client_ip
from source_meta import get_meta
from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S
@@ -19,12 +20,84 @@ from shared.source_rules import (
evaluate_feeds_with_claude,
domain_to_display_name,
)
from shared.services.source_classifier import (
bulk_classify,
classify_source,
ALIGNMENT_VALUES,
POLITICAL_VALUES,
MEDIA_TYPE_VALUES,
RELIABILITY_VALUES,
)
from shared.services.external_reputation import (
apply_reputation_overrides,
sync_all as sync_external_reputation,
)
logger = logging.getLogger("verwaltung.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", "fetch_strategy"}
SOURCE_UPDATE_COLUMNS = {
"name", "url", "domain", "source_type", "category", "status", "notes",
"language", "bias", "fetch_strategy",
"political_orientation", "media_type", "reliability",
"state_affiliated", "country_code",
}
SOURCE_CLASSIFICATION_FIELDS = {
"political_orientation", "media_type", "reliability",
"state_affiliated", "country_code",
}
async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]:
if not source_ids:
return {}
placeholders = ",".join("?" for _ in source_ids)
cursor = await db.execute(
f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment",
source_ids,
)
out: dict[int, list[str]] = {sid: [] for sid in source_ids}
for row in await cursor.fetchall():
out.setdefault(row["source_id"], []).append(row["alignment"])
return out
async def _replace_alignments(db: aiosqlite.Connection, source_id: int, alignments: list[str]):
"""Ersetzt die alignments-Liste einer Quelle (DELETE + INSERT) — Aufrufer muss commit() machen."""
await db.execute("DELETE FROM source_alignments WHERE source_id = ?", (source_id,))
seen: set[str] = set()
for raw in alignments:
a = (raw or "").strip().lower()
if not a or a in seen:
continue
if a not in ALIGNMENT_VALUES:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Ungueltiger alignment-Wert: '{a}'",
)
seen.add(a)
await db.execute(
"INSERT INTO source_alignments (source_id, alignment) VALUES (?, ?)",
(source_id, a),
)
async def _clear_proposed(db: aiosqlite.Connection, source_id: int):
await db.execute(
"""UPDATE sources SET
proposed_political_orientation = NULL,
proposed_media_type = NULL,
proposed_reliability = NULL,
proposed_state_affiliated = NULL,
proposed_country_code = NULL,
proposed_alignments_json = NULL,
proposed_confidence = NULL,
proposed_reasoning = NULL,
proposed_at = NULL
WHERE id = ?""",
(source_id,),
)
@router.get("/meta")
@@ -61,6 +134,12 @@ class GlobalSourceUpdate(BaseModel):
notes: Optional[str] = None
language: Optional[str] = Field(default=None, max_length=100)
bias: Optional[str] = Field(default=None, max_length=500)
political_orientation: Optional[str] = None
media_type: Optional[str] = None
reliability: Optional[str] = None
state_affiliated: Optional[bool] = None
country_code: Optional[str] = Field(default=None, max_length=8)
alignments: Optional[list[str]] = None
@router.get("/global")
@@ -120,7 +199,11 @@ async def list_global_sources(
WHERE s.tenant_id IS NULL
ORDER BY s.category, s.source_type, s.name
""")
return [dict(row) for row in await cursor.fetchall()]
rows = [dict(row) for row in await cursor.fetchall()]
alignments_map = await _load_alignments_for(db, [r["id"] for r in rows])
for r in rows:
r["alignments"] = alignments_map.get(r["id"], [])
return rows
@router.post("/global", status_code=201)
@@ -170,7 +253,7 @@ async def update_global_source(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Grundquelle bearbeiten."""
"""Grundquelle bearbeiten — inkl. Klassifikation + alignments."""
cursor = await db.execute(
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
)
@@ -178,21 +261,50 @@ async def update_global_source(
if not row:
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
before = dict(row)
before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, []))
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
updates[field] = value
payload = data.model_dump(exclude_none=True)
alignments = payload.pop("alignments", None)
if not updates:
return before
if "political_orientation" in payload and payload["political_orientation"] not in POLITICAL_VALUES:
raise HTTPException(status_code=422, detail=f"Ungueltige political_orientation: {payload['political_orientation']}")
if "media_type" in payload and payload["media_type"] not in MEDIA_TYPE_VALUES:
raise HTTPException(status_code=422, detail=f"Ungueltiger media_type: {payload['media_type']}")
if "reliability" in payload and payload["reliability"] not in RELIABILITY_VALUES:
raise HTTPException(status_code=422, detail=f"Ungueltige reliability: {payload['reliability']}")
updates = {k: v for k, v in payload.items() if k in SOURCE_UPDATE_COLUMNS}
if "state_affiliated" in updates:
updates["state_affiliated"] = 1 if updates["state_affiliated"] else 0
classification_touched = any(k in updates for k in SOURCE_CLASSIFICATION_FIELDS) or alignments is not None
if classification_touched:
updates["classification_source"] = "manual"
updates["classified_at"] = None # CURRENT_TIMESTAMP via SQL — siehe unten
if updates:
sets = []
vals = []
for k, v in updates.items():
if k == "classified_at":
sets.append("classified_at = CURRENT_TIMESTAMP")
else:
sets.append(f"{k} = ?")
vals.append(v)
vals.append(source_id)
await db.execute(f"UPDATE sources SET {', '.join(sets)} WHERE id = ?", vals)
if alignments is not None:
await _replace_alignments(db, source_id, alignments)
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [source_id]
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
after = dict(await cursor.fetchone())
after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, []))
if before_alignments != after_alignments:
before["alignments"] = before_alignments
after["alignments"] = after_alignments
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=source_id,
@@ -1086,3 +1198,307 @@ Nur das JSON, kein anderer Text."""
except Exception as e:
raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}")
# === Klassifikations-Review (LLM-Vorschlaege approve/reject/reclassify) ===
@router.get("/classification/stats")
async def classification_stats(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Counts pro classification_source-Wert + Anzahl Pending-Reviews (alle Quellen)."""
cursor = await db.execute(
"""SELECT classification_source, COUNT(*) as cnt
FROM sources
WHERE status = 'active'
GROUP BY classification_source"""
)
by_source = {row["classification_source"] or "legacy": row["cnt"] for row in await cursor.fetchall()}
cursor = await db.execute(
"""SELECT COUNT(*) as cnt FROM sources
WHERE status = 'active' AND proposed_political_orientation IS NOT NULL"""
)
pending = (await cursor.fetchone())["cnt"]
return {
"by_classification_source": by_source,
"pending_review": pending,
"total": sum(by_source.values()),
}
@router.get("/classification/queue")
async def classification_queue(
limit: int = 50,
min_confidence: float = 0.0,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Liefert Quellen mit nicht-leeren proposed_*-Spalten (Review-Queue)."""
cursor = await db.execute(
"""SELECT s.* FROM sources s
WHERE s.proposed_political_orientation IS NOT NULL
AND COALESCE(s.proposed_confidence, 0) >= ?
ORDER BY s.proposed_confidence DESC, s.proposed_at DESC
LIMIT ?""",
(min_confidence, limit),
)
rows = [dict(r) for r in await cursor.fetchall()]
alignments_map = await _load_alignments_for(db, [r["id"] for r in rows])
out = []
for d in rows:
try:
proposed_aligns = json.loads(d.get("proposed_alignments_json") or "[]")
except (json.JSONDecodeError, TypeError):
proposed_aligns = []
out.append({
"id": d["id"],
"name": d["name"],
"url": d.get("url"),
"domain": d.get("domain"),
"source_type": d.get("source_type"),
"category": d.get("category"),
"is_global": d.get("tenant_id") is None,
"current": {
"political_orientation": d.get("political_orientation"),
"media_type": d.get("media_type"),
"reliability": d.get("reliability"),
"state_affiliated": bool(d.get("state_affiliated")),
"country_code": d.get("country_code"),
"alignments": alignments_map.get(d["id"], []),
"classification_source": d.get("classification_source"),
},
"proposed": {
"political_orientation": d.get("proposed_political_orientation"),
"media_type": d.get("proposed_media_type"),
"reliability": d.get("proposed_reliability"),
"state_affiliated": bool(d.get("proposed_state_affiliated")),
"country_code": d.get("proposed_country_code"),
"alignments": proposed_aligns,
"confidence": d.get("proposed_confidence"),
"reasoning": d.get("proposed_reasoning"),
"proposed_at": d.get("proposed_at"),
},
})
return out
@router.post("/{source_id}/classification/approve")
async def approve_classification(
source_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Uebernimmt proposed_* in echte Felder, setzt classification_source='llm_approved'."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
src = dict(row)
before_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, []))
before = {**src, "alignments": before_alignments}
if src.get("proposed_political_orientation") is None:
raise HTTPException(status_code=400, detail="Keine LLM-Vorschlaege fuer diese Quelle vorhanden")
try:
proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]")
except (json.JSONDecodeError, TypeError):
proposed_aligns = []
await db.execute(
"""UPDATE sources SET
political_orientation = ?,
media_type = ?,
reliability = ?,
state_affiliated = ?,
country_code = ?,
classification_source = 'llm_approved',
classified_at = CURRENT_TIMESTAMP
WHERE id = ?""",
(
src["proposed_political_orientation"],
src["proposed_media_type"],
src["proposed_reliability"],
1 if src.get("proposed_state_affiliated") else 0,
src.get("proposed_country_code"),
source_id,
),
)
await _replace_alignments(db, source_id, [a for a in proposed_aligns if a in ALIGNMENT_VALUES])
await _clear_proposed(db, source_id)
await db.commit()
try:
await apply_reputation_overrides(db, source_id)
except Exception as e:
logger.warning("Reputation-Override fuer source_id=%s fehlgeschlagen: %s", source_id, e)
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
after_row = dict(await cursor.fetchone())
after_alignments = sorted((await _load_alignments_for(db, [source_id])).get(source_id, []))
after = {**after_row, "alignments": after_alignments}
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=source_id,
before=before, after=after,
)
return {"source_id": source_id, "status": "approved"}
@router.post("/{source_id}/classification/reject")
async def reject_classification(
source_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Verwirft die LLM-Vorschlaege ohne Uebernahme. classification_source: 'llm_pending' -> 'legacy'."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
src = dict(row)
before = dict(src)
await _clear_proposed(db, source_id)
if src.get("classification_source") == "llm_pending":
await db.execute(
"UPDATE sources SET classification_source = 'legacy' WHERE id = ?",
(source_id,),
)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
after = dict(await cursor.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=source_id,
before=before, after=after,
)
return {"source_id": source_id, "status": "rejected"}
@router.post("/{source_id}/classification/reclassify")
async def reclassify_source(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Triggert eine LLM-Klassifikation einer einzelnen Quelle (synchron, ~3-5s)."""
cursor = await db.execute("SELECT id FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
try:
result = await classify_source(db, source_id)
except Exception as e:
logger.error("Reclassify source_id=%s fehlgeschlagen: %s", source_id, e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Klassifikation fehlgeschlagen: {e}")
return result
async def _bulk_classify_background(limit: int, only_unclassified: bool):
"""Hintergrund-Task: oeffnet eigene DB-Connection."""
db = await get_db()
try:
await bulk_classify(db, limit=limit, only_unclassified=only_unclassified)
finally:
await db.close()
@router.post("/classification/bulk-classify")
async def trigger_bulk_classify(
background_tasks: BackgroundTasks,
limit: int = 50,
only_unclassified: bool = True,
admin: dict = Depends(get_current_admin),
):
"""Startet eine Bulk-Klassifikation im Hintergrund."""
if limit < 1 or limit > 500:
raise HTTPException(status_code=400, detail="limit muss zwischen 1 und 500 liegen")
background_tasks.add_task(_bulk_classify_background, limit, only_unclassified)
return {"status": "started", "limit": limit, "only_unclassified": only_unclassified}
@router.post("/external-reputation/sync")
async def trigger_external_reputation_sync(
background_tasks: BackgroundTasks,
admin: dict = Depends(get_current_admin),
):
"""Startet Sync von IFCN- und EUvsDisinfo-Daten (Hintergrund)."""
async def _bg():
db = await get_db()
try:
await sync_external_reputation(db)
finally:
await db.close()
background_tasks.add_task(_bg)
return {"status": "started"}
@router.post("/classification/bulk-approve")
async def bulk_approve_classifications(
request: Request,
min_confidence: float = 0.85,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Genehmigt alle Pending-Vorschlaege ueber dem confidence-Schwellwert."""
cursor = await db.execute(
"""SELECT id, proposed_political_orientation, proposed_media_type,
proposed_reliability, proposed_state_affiliated,
proposed_country_code, proposed_alignments_json
FROM sources
WHERE proposed_political_orientation IS NOT NULL
AND COALESCE(proposed_confidence, 0) >= ?""",
(min_confidence,),
)
rows = [dict(r) for r in await cursor.fetchall()]
approved_ids: list[int] = []
for src in rows:
try:
proposed_aligns = json.loads(src.get("proposed_alignments_json") or "[]")
except (json.JSONDecodeError, TypeError):
proposed_aligns = []
await db.execute(
"""UPDATE sources SET
political_orientation = ?,
media_type = ?,
reliability = ?,
state_affiliated = ?,
country_code = ?,
classification_source = 'llm_approved',
classified_at = CURRENT_TIMESTAMP
WHERE id = ?""",
(
src["proposed_political_orientation"],
src["proposed_media_type"],
src["proposed_reliability"],
1 if src.get("proposed_state_affiliated") else 0,
src.get("proposed_country_code"),
src["id"],
),
)
await _replace_alignments(
db, src["id"], [a for a in proposed_aligns if a in ALIGNMENT_VALUES]
)
await _clear_proposed(db, src["id"])
approved_ids.append(src["id"])
await db.commit()
try:
for sid in approved_ids:
await apply_reputation_overrides(db, sid)
except Exception as e:
logger.warning("Bulk Reputation-Override fehlgeschlagen: %s", e)
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=None,
after={"bulk_approved_ids": approved_ids, "min_confidence": min_confidence},
)
return {"approved": len(approved_ids), "ids": approved_ids}