"""Klassifiziert Quellen via Claude (Haiku) nach 4 Achsen + state_affiliated + country. Schreibt Vorschlaege in die proposed_*-Spalten von sources und setzt classification_source='llm_pending'. Approval erfolgt ueber separate Endpoints, die proposed_* in die echten Spalten kopieren. """ import asyncio import json import logging import re import aiosqlite from agents.claude_client import call_claude from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.source_classifier") POLITICAL_VALUES = { "links_extrem", "links", "mitte_links", "liberal", "mitte", "konservativ", "mitte_rechts", "rechts", "rechts_extrem", "na", } MEDIA_TYPE_VALUES = { "tageszeitung", "wochenzeitung", "magazin", "tv_sender", "radio", "oeffentlich_rechtlich", "nachrichtenagentur", "online_only", "blog", "telegram_kanal", "telegram_bot", "podcast", "social_media", "imageboard", "think_tank", "ngo", "behoerde", "staatsmedium", "fachmedium", "sonstige", } RELIABILITY_VALUES = {"sehr_hoch", "hoch", "gemischt", "niedrig", "sehr_niedrig", "na"} ALIGNMENT_VALUES = { "prorussisch", "proiranisch", "prowestlich", "proukrainisch", "prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch", "protuerkisch", "panarabisch", "neutral", "sonstige", } def _build_prompt(src: dict, sample_articles: list[dict]) -> str: sample_text = "" if sample_articles: lines = [] for i, art in enumerate(sample_articles[:5], 1): headline = (art.get("headline") or art.get("headline_de") or "").strip() if headline: lines.append(f"{i}. {headline[:200]}") if lines: sample_text = "\nLetzte Artikel/Headlines:\n" + "\n".join(lines) return f"""Du bist ein OSINT-Analyst und klassifizierst Nachrichten- und Medienquellen fuer ein Lagebild-Monitoring-System (DACH-Raum). QUELLE: Name: {src.get('name')} URL: {src.get('url') or '-'} Domain: {src.get('domain') or '-'} Quellentyp: {src.get('source_type')} Bisherige Kategorie: {src.get('category')} Sprache: {src.get('language') or 'unbekannt'} Bisherige Notiz (Freitext): {src.get('bias') or '-'}{sample_text} AUFGABE: Klassifiziere die Quelle nach folgenden Achsen. 1. political_orientation: - links_extrem (z.B. linksunten.indymedia) - links (klar links, z.B. junge Welt, taz) - mitte_links (linksliberal/sozialdemokratisch, z.B. SZ, Spiegel) - liberal (wirtschafts-/grünliberal, z.B. NZZ, Zeit) - mitte (politisch neutral, Agentur, z.B. dpa, Reuters, tagesschau) - konservativ (buergerlich-konservativ, z.B. FAZ, Welt) - mitte_rechts (rechts-buergerlich, z.B. Tichys Einblick, Achgut) - rechts (klar rechts, z.B. Junge Freiheit, EpochTimes) - rechts_extrem (z.B. Compact, PI-News) - na (nicht klassifizierbar: Behoerde, Fachmedium, Think Tank ohne klare politische Linie) 2. media_type (genau einer): tageszeitung, wochenzeitung, magazin, tv_sender, radio, oeffentlich_rechtlich, nachrichtenagentur, online_only, blog, telegram_kanal, telegram_bot, podcast, social_media, imageboard, think_tank, ngo, behoerde, staatsmedium, fachmedium, sonstige 3. reliability: - sehr_hoch (etablierte Qualitaet, Faktencheck: tagesschau, dpa, FAZ, Reuters) - hoch (serioes mit gelegentlichen Schwaechen: taz, Welt, BILD bei harten News) - gemischt (Mix Meinung/Einseitigkeit: Tichys Einblick, Achgut, Boulevard) - niedrig (haeufig irrefuehrend, schwache Quellenarbeit: Junge Freiheit, EpochTimes) - sehr_niedrig (bekannt fuer Desinformation/Verschwoerung: Compact, RT, Sputnik, PI-News) - na (nicht bewertbar) 4. alignments (Mehrfach, leeres Array wenn keine ausgepraegte Naehe): prorussisch, proiranisch, prowestlich, proukrainisch, prochinesisch, projapanisch, proisraelisch, propalaestinensisch, protuerkisch, panarabisch, neutral, sonstige 5. state_affiliated (true/false): true wenn vom Staat finanziert/kontrolliert (RT, Sputnik, CGTN, PressTV, Xinhua, TRT). Public Service Broadcaster wie ARD/ZDF/BBC sind NICHT state_affiliated. 6. country_code (ISO 3166-1 alpha-2): Heimatland (DE, AT, CH, RU, US, ...). null wenn unklar. 7. confidence (0.0-1.0): 0.85+ fuer bekannte Outlets, 0.5-0.85 fuer mittelbekannt, <0.5 fuer unsicher. 8. reasoning (1-2 Saetze): Kurze Begruendung der Hauptklassifikationen. WICHTIG: - Antworte AUSSCHLIESSLICH mit einem JSON-Objekt, kein Text drumherum. - Nutze ausschliesslich die genannten enum-Werte (snake_case). - Bei Unklarheit lieber `na` und niedrige confidence. JSON-Schema: {{ "political_orientation": "...", "media_type": "...", "reliability": "...", "alignments": ["..."], "state_affiliated": false, "country_code": "DE", "confidence": 0.9, "reasoning": "..." }}""" async def _load_sample_articles(db: aiosqlite.Connection, name: str, domain: str | None, limit: int = 5) -> list[dict]: """Laedt die letzten Headlines einer Quelle (per name oder Domain-Match).""" rows: list = [] if name: cursor = await db.execute( "SELECT headline, headline_de FROM articles WHERE source = ? ORDER BY collected_at DESC LIMIT ?", (name, limit), ) rows = await cursor.fetchall() if not rows and domain: cursor = await db.execute( "SELECT headline, headline_de FROM articles WHERE source_url LIKE ? ORDER BY collected_at DESC LIMIT ?", (f"%{domain}%", limit), ) rows = await cursor.fetchall() return [dict(r) for r in rows] def _validate(parsed: dict) -> dict: """Validiert + normalisiert eine LLM-Antwort gegen die Enums.""" pol = parsed.get("political_orientation", "na") if pol not in POLITICAL_VALUES: pol = "na" mt = parsed.get("media_type", "sonstige") if mt not in MEDIA_TYPE_VALUES: mt = "sonstige" rel = parsed.get("reliability", "na") if rel not in RELIABILITY_VALUES: rel = "na" aligns_raw = parsed.get("alignments") or [] if not isinstance(aligns_raw, list): aligns_raw = [] aligns = sorted({a for a in aligns_raw if isinstance(a, str) and a in ALIGNMENT_VALUES}) sa = bool(parsed.get("state_affiliated", False)) cc = parsed.get("country_code") if isinstance(cc, str) and len(cc) == 2 and cc.isalpha(): cc = cc.upper() else: cc = None try: confidence = float(parsed.get("confidence", 0.5)) confidence = max(0.0, min(1.0, confidence)) except (TypeError, ValueError): confidence = 0.5 reasoning = str(parsed.get("reasoning", ""))[:1000] return { "political_orientation": pol, "media_type": mt, "reliability": rel, "alignments": aligns, "state_affiliated": sa, "country_code": cc, "confidence": confidence, "reasoning": reasoning, } async def classify_source( db: aiosqlite.Connection, source_id: int, sample_limit: int = 5, model: str = CLAUDE_MODEL_FAST, ) -> dict: """Klassifiziert eine einzelne Quelle und schreibt die Vorschlaege in proposed_*-Spalten.""" cursor = await db.execute( "SELECT id, name, url, domain, source_type, category, language, bias, " "classification_source FROM sources WHERE id = ?", (source_id,), ) row = await cursor.fetchone() if not row: raise ValueError(f"Quelle {source_id} nicht gefunden") src = dict(row) sample = await _load_sample_articles(db, src["name"], src.get("domain"), sample_limit) prompt = _build_prompt(src, sample) response, usage = await call_claude(prompt, tools=None, model=model) json_match = re.search(r"\{.*\}", response, re.DOTALL) if not json_match: raise ValueError(f"Keine JSON-Antwort von Claude fuer source_id={source_id}: {response[:200]}") parsed = json.loads(json_match.group(0)) result = _validate(parsed) # Nur classification_source auf 'llm_pending' setzen, wenn nicht bereits manuell/approved new_src = "CASE WHEN classification_source IN ('manual','llm_approved') THEN classification_source ELSE 'llm_pending' END" await db.execute( f"""UPDATE sources SET proposed_political_orientation = ?, proposed_media_type = ?, proposed_reliability = ?, proposed_state_affiliated = ?, proposed_country_code = ?, proposed_alignments_json = ?, proposed_confidence = ?, proposed_reasoning = ?, proposed_at = CURRENT_TIMESTAMP, classification_source = {new_src} WHERE id = ?""", ( result["political_orientation"], result["media_type"], result["reliability"], 1 if result["state_affiliated"] else 0, result["country_code"], json.dumps(result["alignments"], ensure_ascii=False), result["confidence"], result["reasoning"], source_id, ), ) await db.commit() logger.info( "Klassifiziert source_id=%s '%s' -> %s/%s/%s conf=%.2f ($%.4f)", source_id, src["name"], result["political_orientation"], result["media_type"], result["reliability"], result["confidence"], usage.cost_usd, ) result["source_id"] = source_id result["usage"] = { "cost_usd": usage.cost_usd, "input_tokens": usage.input_tokens, "output_tokens": usage.output_tokens, } return result async def bulk_classify( db: aiosqlite.Connection, limit: int = 50, only_unclassified: bool = True, model: str = CLAUDE_MODEL_FAST, ) -> dict: """Klassifiziert noch unklassifizierte Quellen (sequenziell). Args: limit: Maximale Anzahl Quellen pro Aufruf only_unclassified: Wenn True, nur classification_source='legacy'. Wenn False, auch 'llm_pending' neu klassifizieren. """ if only_unclassified: where = "classification_source = 'legacy'" else: where = "classification_source IN ('legacy', 'llm_pending')" cursor = await db.execute( f"SELECT id FROM sources WHERE {where} AND status = 'active' " f"AND source_type != 'excluded' ORDER BY id LIMIT ?", (limit,), ) ids = [row["id"] for row in await cursor.fetchall()] total_cost = 0.0 success = 0 errors: list[dict] = [] for sid in ids: try: r = await classify_source(db, sid, model=model) total_cost += r["usage"]["cost_usd"] success += 1 except asyncio.CancelledError: raise except Exception as e: logger.error("Klassifikation source_id=%s fehlgeschlagen: %s", sid, e, exc_info=True) errors.append({"source_id": sid, "error": str(e)}) logger.info( "Bulk-Klassifikation fertig: %d/%d erfolgreich, $%.4f Kosten, %d Fehler", success, len(ids), total_cost, len(errors), ) return { "processed": len(ids), "success": success, "errors": errors, "total_cost_usd": total_cost, }