diff --git a/src/agents/entity_extractor.py b/src/agents/entity_extractor.py index 41aedb2..8b4a2c2 100644 --- a/src/agents/entity_extractor.py +++ b/src/agents/entity_extractor.py @@ -1,746 +1,937 @@ -"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Opus).""" -import asyncio -import hashlib -import json -import logging -import re -from datetime import datetime -from typing import Optional - -from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator -from config import CLAUDE_MODEL_FAST, TIMEZONE - -logger = logging.getLogger("osint.entity_extractor") - -# --------------------------------------------------------------------------- -# Prompts -# --------------------------------------------------------------------------- - -ENTITY_EXTRACTION_PROMPT = """Du bist ein OSINT-Analyst für ein Lagemonitoring-System. -AUFGABE: Extrahiere ALLE relevanten Entitäten aus den folgenden Nachrichtenartikeln. - -ARTIKEL: -{articles_text} - -REGELN: -- Extrahiere JEDE genannte Person, Organisation, Ort, Ereignis und militärische Einheit -- Normalisiere Namen: "Wladimir Putin", "Putin", "V. Putin" -> eine Entität -- Aliase erfassen: Alle Namensvarianten einer Entität als aliases[] -- mention_count: Wie oft wird die Entität insgesamt in allen Artikeln erwähnt? -- Beschreibung: Kurze Einordnung, wer/was die Entität ist (1 Satz) -- KEINE Duplikate: Gleiche Entitäten zusammenfassen - -ENTITY-TYPEN: -- "person": Individuelle Personen (Politiker, Militärs, Journalisten etc.) -- "organisation": Organisationen, Parteien, Behörden, Unternehmen, NGOs -- "location": Länder, Städte, Regionen, Gebiete -- "event": Konkrete Ereignisse (Wahlen, Anschläge, Konferenzen etc.) -- "military": Militärische Einheiten, Waffensysteme, Operationen - -AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: -{{ - "entities": [ - {{ - "name": "Vollständiger Name", - "name_normalized": "vollständiger name", - "type": "person|organisation|location|event|military", - "description": "Kurze Einordnung (1 Satz)", - "aliases": ["Alias1", "Alias2"], - "mention_count": 5 - }} - ] -}}""" - - -RELATIONSHIP_ANALYSIS_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System. -AUFGABE: Analysiere die Beziehungen zwischen den extrahierten Entitäten und korrigiere ggf. Fehler. - -EXTRAHIERTE ENTITÄTEN: -{entities_json} - -QUELLMATERIAL: -{source_texts} - -TEIL 1 — KORREKTUREN (optional): -Prüfe die extrahierten Entitäten auf Fehler: -- "name_fix": Name ist falsch geschrieben oder unvollständig -- "merge": Zwei Entitäten sind dieselbe -> zusammenführen -- "add": Wichtige Entität fehlt komplett - -TEIL 2 — BEZIEHUNGEN: -Identifiziere ALLE relevanten Beziehungen zwischen den Entitäten. - -BEZIEHUNGS-KATEGORIEN: -- "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft -- "conflict": Konflikt, Krieg, Feindschaft, Sanktionen, Opposition -- "diplomacy": Diplomatische Beziehungen, Verhandlungen, Abkommen -- "economic": Wirtschaftsbeziehungen, Handel, Investitionen -- "legal": Rechtliche Beziehungen, Klagen, Verurteilungen -- "neutral": Sonstige Beziehung, Erwähnung, Verbindung - -REGELN: -- weight: 1 (schwach/indirekt) bis 5 (stark/direkt) -- evidence[]: Stichpunkte aus dem Quellmaterial die die Beziehung belegen -- status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd) -- source und target: Exakt die Namen aus der Entitäten-Liste verwenden - -AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: -{{ - "corrections": [ - {{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}}, - {{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}}, - {{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}} - ], - "relations": [ - {{ - "source": "Entität A", - "target": "Entität B", - "category": "alliance|conflict|diplomacy|economic|legal|neutral", - "label": "Kurzes Label (2-4 Wörter)", - "description": "Beschreibung (1-2 Sätze)", - "weight": 3, - "status": "active|historical|emerging", - "evidence": ["Beleg 1", "Beleg 2"] - }} - ] -}}""" - - -# --------------------------------------------------------------------------- -# Hilfsfunktionen -# --------------------------------------------------------------------------- - -async def _broadcast(ws_manager, msg_type: str, data: dict): - """Sendet eine WebSocket-Nachricht, falls ws_manager vorhanden.""" - if ws_manager: - try: - await ws_manager.broadcast({"type": msg_type, **data}) - except Exception: - pass - - -def _parse_json_response(text: str) -> Optional[dict]: - """Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences.""" - if not text: - return None - # Direkt - try: - return json.loads(text) - except json.JSONDecodeError: - pass - # Markdown-Fences - fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL) - if fence_match: - try: - return json.loads(fence_match.group(1)) - except json.JSONDecodeError: - pass - # Erstes JSON-Objekt - obj_match = re.search(r'\{.*\}', text, re.DOTALL) - if obj_match: - try: - return json.loads(obj_match.group()) - except json.JSONDecodeError: - pass - logger.warning("JSON-Parse fehlgeschlagen") - return None - - -async def _check_analysis_exists(db, analysis_id: int) -> bool: - """Prüft ob die Analyse noch existiert.""" - cursor = await db.execute( - "SELECT id FROM network_analyses WHERE id = ?", (analysis_id,) - ) - return await cursor.fetchone() is not None - - -def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> str: - """SHA256-Hash über sortierte IDs und Timestamps.""" - parts = [] - for aid, ats in sorted(zip(article_ids, article_ts)): - parts.append(f"a:{aid}:{ats}") - for fid, fts in sorted(zip(factcheck_ids, factcheck_ts)): - parts.append(f"f:{fid}:{fts}") - return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() - - -# --------------------------------------------------------------------------- -# Phase 1: Entity-Extraktion (Haiku) -# --------------------------------------------------------------------------- - -async def _phase1_extract_entities( - db, analysis_id: int, tenant_id: int, - articles: list[dict], factchecks: list[dict], - usage_acc: UsageAccumulator, ws_manager=None, -) -> list[dict]: - """Extrahiert Entitäten aus Artikeln via Haiku in Batches.""" - logger.info(f"Phase 1: {len(articles)} Artikel, {len(factchecks)} Faktenchecks") - - all_texts = [] - for art in articles: - headline = art.get("headline_de") or art.get("headline") or "" - content = art.get("content_de") or art.get("content_original") or "" - source = art.get("source") or "" - # Inhalt kürzen falls sehr lang - if len(content) > 2000: - content = content[:2000] + "..." - all_texts.append(f"[{source}] {headline}\n{content}") - - for fc in factchecks: - claim = fc.get("claim") or "" - evidence = fc.get("evidence") or "" - status = fc.get("status") or "" - all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") - - if not all_texts: - logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden") - return [] - - # Batching - batch_size = 30 - batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] - logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") - - entity_map: dict[tuple[str, str], dict] = {} - - for batch_idx, batch in enumerate(batches): - articles_text = "\n\n---\n\n".join(batch) - prompt = ENTITY_EXTRACTION_PROMPT.format(articles_text=articles_text) - - try: - result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) - usage_acc.add(usage) - except Exception as e: - logger.error(f"Haiku Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") - continue - - parsed = _parse_json_response(result_text) - if not parsed or "entities" not in parsed: - logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") - continue - - entities = parsed["entities"] - if not isinstance(entities, list): - continue - - for ent in entities: - if not isinstance(ent, dict): - continue - name = (ent.get("name") or "").strip() - if not name: - continue - - name_normalized = (ent.get("name_normalized") or name.lower()).strip().lower() - entity_type = (ent.get("type") or "organisation").lower().strip() - valid_types = {"person", "organisation", "location", "event", "military"} - if entity_type not in valid_types: - entity_type = "organisation" - - key = (name_normalized, entity_type) - - if key in entity_map: - existing = entity_map[key] - aliases = set(existing.get("aliases", [])) - for alias in ent.get("aliases", []): - if alias and alias.strip(): - aliases.add(alias.strip()) - if name != existing["name"]: - aliases.add(name) - existing["aliases"] = list(aliases) - existing["mention_count"] = existing.get("mention_count", 1) + ent.get("mention_count", 1) - new_desc = ent.get("description", "") - if len(new_desc) > len(existing.get("description", "")): - existing["description"] = new_desc - else: - entity_map[key] = { - "name": name, - "name_normalized": name_normalized, - "type": entity_type, - "description": ent.get("description", ""), - "aliases": [a.strip() for a in ent.get("aliases", []) if a and a.strip()], - "mention_count": ent.get("mention_count", 1), - } - - logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {len(entity_map)} Entitäten gesamt") - - await _broadcast(ws_manager, "network_status", { - "analysis_id": analysis_id, - "phase": "entity_extraction", - "progress": int((batch_idx + 1) / len(batches) * 100), - }) - - # In DB speichern - all_entities = list(entity_map.values()) - - for ent in all_entities: - try: - cursor = await db.execute( - """INSERT OR IGNORE INTO network_entities - (network_analysis_id, name, name_normalized, entity_type, - description, aliases, mention_count, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - ( - analysis_id, ent["name"], ent["name_normalized"], ent["type"], - ent.get("description", ""), - json.dumps(ent.get("aliases", []), ensure_ascii=False), - ent.get("mention_count", 1), - tenant_id, - ), - ) - ent["db_id"] = cursor.lastrowid - except Exception as e: - logger.warning(f"Entity speichern fehlgeschlagen '{ent['name']}': {e}") - - await db.commit() - logger.info(f"Phase 1 abgeschlossen: {len(all_entities)} Entitäten gespeichert") - return all_entities - - -# --------------------------------------------------------------------------- -# Phase 2: Beziehungsanalyse (Opus) -# --------------------------------------------------------------------------- - -async def _phase2_analyze_relationships( - db, analysis_id: int, tenant_id: int, - entities: list[dict], articles: list[dict], factchecks: list[dict], - usage_acc: UsageAccumulator, ws_manager=None, -) -> list[dict]: - """Analysiert Beziehungen via Opus und wendet Korrekturen an.""" - if not entities: - return [] - - logger.info(f"Phase 2: {len(entities)} Entitäten, Beziehungsanalyse") - - await _broadcast(ws_manager, "network_status", { - "analysis_id": analysis_id, - "phase": "relationship_extraction", - "progress": 0, - }) - - # Entitäten für Prompt - entities_for_prompt = [ - {"name": e["name"], "type": e["type"], - "description": e.get("description", ""), "aliases": e.get("aliases", [])} - for e in entities - ] - entities_json = json.dumps(entities_for_prompt, ensure_ascii=False, indent=2) - - # Quelltexte - source_parts = [] - for art in articles: - headline = art.get("headline_de") or art.get("headline") or "" - content = art.get("content_de") or art.get("content_original") or "" - source = art.get("source") or "" - source_parts.append(f"[{source}] {headline}\n{content}") - - for fc in factchecks: - claim = fc.get("claim") or "" - evidence = fc.get("evidence") or "" - source_parts.append(f"[Faktencheck] {claim}\n{evidence}") - - source_texts = "\n\n---\n\n".join(source_parts) - - # Kürzen falls zu lang - if len(source_texts) > 150_000: - logger.info(f"Quelltexte zu lang ({len(source_texts)} Zeichen), kürze") - short_parts = [] - for art in articles: - headline = art.get("headline_de") or art.get("headline") or "" - content = art.get("content_de") or art.get("content_original") or "" - short = content[:500] + "..." if len(content) > 500 else content - short_parts.append(f"[{art.get('source', '')}] {headline}: {short}") - for fc in factchecks: - short_parts.append(f"[Faktencheck] {fc.get('claim', '')} (Status: {fc.get('status', '')})") - source_texts = "\n\n".join(short_parts) - - prompt = RELATIONSHIP_ANALYSIS_PROMPT.format( - entities_json=entities_json, source_texts=source_texts, - ) - - try: - result_text, usage = await call_claude(prompt, tools=None, model=None) - usage_acc.add(usage) - except Exception as e: - logger.error(f"Opus Beziehungsanalyse fehlgeschlagen: {e}") - return [] - - parsed = _parse_json_response(result_text) - if not parsed: - logger.warning("Kein gültiges JSON von Opus") - return [] - - # Korrekturen anwenden - corrections = parsed.get("corrections", []) - if corrections and isinstance(corrections, list): - await _broadcast(ws_manager, "network_status", { - "analysis_id": analysis_id, - "phase": "correction", - "progress": 0, - }) - await _apply_corrections(db, analysis_id, tenant_id, entities, corrections) - - # Beziehungen speichern - relations = parsed.get("relations", []) - if not isinstance(relations, list): - return [] - - name_to_id = _build_entity_name_map(entities) - valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"} - saved_relations = [] - - for rel in relations: - if not isinstance(rel, dict): - continue - - source_name = (rel.get("source") or "").strip() - target_name = (rel.get("target") or "").strip() - if not source_name or not target_name: - continue - - source_id = name_to_id.get(source_name.lower()) - target_id = name_to_id.get(target_name.lower()) - if not source_id or not target_id or source_id == target_id: - continue - - category = (rel.get("category") or "neutral").lower().strip() - if category not in valid_categories: - category = "neutral" - - weight = rel.get("weight", 3) - try: - weight = max(1, min(5, int(weight))) - except (ValueError, TypeError): - weight = 3 - - status = (rel.get("status") or "active").lower().strip() - if status not in {"active", "historical", "emerging"}: - status = "active" - - evidence = rel.get("evidence", []) - if not isinstance(evidence, list): - evidence = [] - - try: - cursor = await db.execute( - """INSERT INTO network_relations - (network_analysis_id, source_entity_id, target_entity_id, - category, label, description, weight, status, evidence, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - ( - analysis_id, source_id, target_id, category, - rel.get("label", ""), rel.get("description", ""), - weight, status, - json.dumps(evidence, ensure_ascii=False), - tenant_id, - ), - ) - saved_relations.append({"id": cursor.lastrowid}) - except Exception as e: - logger.warning(f"Beziehung speichern fehlgeschlagen: {e}") - - await db.commit() - logger.info(f"Phase 2 abgeschlossen: {len(corrections)} Korrekturen, {len(saved_relations)} Beziehungen") - - await _broadcast(ws_manager, "network_status", { - "analysis_id": analysis_id, - "phase": "relationship_extraction", - "progress": 100, - }) - - return saved_relations - - -def _build_entity_name_map(entities: list[dict]) -> dict[str, int]: - """Mapping: normalisierter Name/Alias -> DB-ID.""" - name_to_id: dict[str, int] = {} - for ent in entities: - db_id = ent.get("db_id") - if not db_id: - continue - name_to_id[ent["name"].lower()] = db_id - name_to_id[ent["name_normalized"]] = db_id - for alias in ent.get("aliases", []): - if alias and alias.strip(): - name_to_id[alias.strip().lower()] = db_id - return name_to_id - - -async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections): - """Wendet Opus-Korrekturen auf Entitäten an.""" - for corr in corrections: - if not isinstance(corr, dict): - continue - corr_type = corr.get("type", "") - - try: - if corr_type == "name_fix": - entity_name = (corr.get("entity_name") or "").strip() - corrected_name = (corr.get("corrected_name") or "").strip() - if not entity_name or not corrected_name: - continue - - for ent in entities: - if ent["name"].lower() == entity_name.lower() or \ - ent["name_normalized"] == entity_name.lower(): - old_name = ent["name"] - ent["name"] = corrected_name - ent["name_normalized"] = corrected_name.lower().strip() - ent.setdefault("aliases", []) - if old_name not in ent["aliases"]: - ent["aliases"].append(old_name) - - if ent.get("db_id"): - await db.execute( - """UPDATE network_entities - SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1 - WHERE id = ?""", - (corrected_name, corrected_name.lower().strip(), - json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]), - ) - break - - elif corr_type == "merge": - keep_name = (corr.get("entity_name") or "").strip() - merge_name = (corr.get("merge_with") or "").strip() - if not keep_name or not merge_name: - continue - - keep_ent = merge_ent = None - for ent in entities: - nl = ent["name"].lower() - nn = ent["name_normalized"] - if nl == keep_name.lower() or nn == keep_name.lower(): - keep_ent = ent - elif nl == merge_name.lower() or nn == merge_name.lower(): - merge_ent = ent - - if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"): - aliases = set(keep_ent.get("aliases", [])) - aliases.add(merge_ent["name"]) - for a in merge_ent.get("aliases", []): - if a and a.strip(): - aliases.add(a.strip()) - keep_ent["aliases"] = list(aliases) - keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) - - # Mentions übertragen - await db.execute( - "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", - (keep_ent["db_id"], merge_ent["db_id"]), - ) - await db.execute( - """UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1 - WHERE id = ?""", - (json.dumps(keep_ent["aliases"], ensure_ascii=False), - keep_ent["mention_count"], keep_ent["db_id"]), - ) - await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],)) - entities.remove(merge_ent) - - elif corr_type == "add": - entity_name = (corr.get("entity_name") or "").strip() - entity_type = (corr.get("entity_type") or "organisation").lower().strip() - description = corr.get("description", "") - if not entity_name: - continue - - valid_types = {"person", "organisation", "location", "event", "military"} - if entity_type not in valid_types: - entity_type = "organisation" - - name_norm = entity_name.lower().strip() - if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities): - continue - - cursor = await db.execute( - """INSERT OR IGNORE INTO network_entities - (network_analysis_id, name, name_normalized, entity_type, - description, aliases, mention_count, corrected_by_opus, tenant_id) - VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""", - (analysis_id, entity_name, name_norm, entity_type, description, tenant_id), - ) - if cursor.lastrowid: - entities.append({ - "name": entity_name, "name_normalized": name_norm, - "type": entity_type, "description": description, - "aliases": [], "mention_count": 1, "db_id": cursor.lastrowid, - }) - - except Exception as e: - logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}") - - await db.commit() - - -# --------------------------------------------------------------------------- -# Phase 3: Finalisierung -# --------------------------------------------------------------------------- - -async def _phase3_finalize( - db, analysis_id, tenant_id, entity_count, relation_count, - article_ids, factcheck_ids, article_ts, factcheck_ts, - usage_acc, ws_manager=None, -): - """Finalisiert: Zähler, Hash, Log, Status.""" - data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) - now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") - - await db.execute( - """UPDATE network_analyses - SET entity_count = ?, relation_count = ?, status = 'ready', - last_generated_at = ?, data_hash = ? - WHERE id = ?""", - (entity_count, relation_count, now, data_hash, analysis_id), - ) - - await db.execute( - """INSERT INTO network_generation_log - (network_analysis_id, completed_at, status, input_tokens, output_tokens, - cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, - entity_count, relation_count, tenant_id) - VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, - usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, - usage_acc.total_cost_usd, usage_acc.call_count, - entity_count, relation_count, tenant_id), - ) - - await db.commit() - - logger.info(f"Analyse {analysis_id} finalisiert: {entity_count} Entitäten, " - f"{relation_count} Beziehungen, ${usage_acc.total_cost_usd:.4f}") - - await _broadcast(ws_manager, "network_complete", { - "analysis_id": analysis_id, - "entity_count": entity_count, - "relation_count": relation_count, - "cost_usd": round(usage_acc.total_cost_usd, 4), - }) - - -# --------------------------------------------------------------------------- -# Hauptfunktion -# --------------------------------------------------------------------------- - -async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): - """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. - - Phase 1: Haiku extrahiert Entitäten aus Artikeln - Phase 2: Opus analysiert Beziehungen und korrigiert Haiku-Fehler - Phase 3: Finalisierung (Zähler, Hash, Log) - """ - from database import get_db - - db = await get_db() - usage_acc = UsageAccumulator() - - try: - if not await _check_analysis_exists(db, analysis_id): - logger.warning(f"Analyse {analysis_id} existiert nicht") - return - - await db.execute( - "UPDATE network_analyses SET status = 'generating' WHERE id = ?", - (analysis_id,), - ) - await db.commit() - - # Incident-IDs laden - cursor = await db.execute( - "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", - (analysis_id,), - ) - incident_ids = [row["incident_id"] for row in await cursor.fetchall()] - - if not incident_ids: - logger.warning(f"Analyse {analysis_id}: Keine Lagen verknüpft") - await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) - await db.commit() - await _broadcast(ws_manager, "network_error", { - "analysis_id": analysis_id, "error": "Keine Lagen verknüpft", - }) - return - - # Artikel laden - placeholders = ",".join("?" * len(incident_ids)) - cursor = await db.execute( - f"""SELECT id, incident_id, headline, headline_de, source, source_url, - content_original, content_de, collected_at - FROM articles WHERE incident_id IN ({placeholders})""", - incident_ids, - ) - article_rows = await cursor.fetchall() - articles = [] - article_ids = [] - article_ts = [] - for r in article_rows: - articles.append({ - "id": r["id"], "incident_id": r["incident_id"], - "headline": r["headline"], "headline_de": r["headline_de"], - "source": r["source"], "source_url": r["source_url"], - "content_original": r["content_original"], "content_de": r["content_de"], - }) - article_ids.append(r["id"]) - article_ts.append(r["collected_at"] or "") - - # Faktenchecks laden - cursor = await db.execute( - f"""SELECT id, incident_id, claim, status, evidence, checked_at - FROM fact_checks WHERE incident_id IN ({placeholders})""", - incident_ids, - ) - fc_rows = await cursor.fetchall() - factchecks = [] - factcheck_ids = [] - factcheck_ts = [] - for r in fc_rows: - factchecks.append({ - "id": r["id"], "incident_id": r["incident_id"], - "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], - }) - factcheck_ids.append(r["id"]) - factcheck_ts.append(r["checked_at"] or "") - - logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " - f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") - - # Phase 1 - if not await _check_analysis_exists(db, analysis_id): - return - - entities = await _phase1_extract_entities( - db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, - ) - - # Phase 2 - if not await _check_analysis_exists(db, analysis_id): - return - - relations = await _phase2_analyze_relationships( - db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, - ) - - # Phase 3 - if not await _check_analysis_exists(db, analysis_id): - return - - await _phase3_finalize( - db, analysis_id, tenant_id, - entity_count=len(entities), relation_count=len(relations), - article_ids=article_ids, factcheck_ids=factcheck_ids, - article_ts=article_ts, factcheck_ts=factcheck_ts, - usage_acc=usage_acc, ws_manager=ws_manager, - ) - - except Exception as e: - logger.error(f"Entity-Extraktion fehlgeschlagen (Analyse {analysis_id}): {e}", exc_info=True) - try: - await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) - await db.commit() - except Exception: - pass - await _broadcast(ws_manager, "network_error", { - "analysis_id": analysis_id, "error": str(e), - }) - finally: - await db.close() +"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Batched).""" +import asyncio +import hashlib +import json +import logging +import re +from datetime import datetime +from typing import Optional + +from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator +from config import CLAUDE_MODEL_FAST, TIMEZONE + +logger = logging.getLogger("osint.entity_extractor") + +# --------------------------------------------------------------------------- +# Prompts +# --------------------------------------------------------------------------- + +ENTITY_EXTRACTION_PROMPT = """Du bist ein OSINT-Analyst für ein Lagemonitoring-System. +AUFGABE: Extrahiere ALLE relevanten Entitäten aus den folgenden Nachrichtenartikeln. + +ARTIKEL: +{articles_text} + +REGELN: +- Extrahiere JEDE genannte Person, Organisation, Ort, Ereignis und militärische Einheit +- Normalisiere Namen: "Wladimir Putin", "Putin", "V. Putin" -> eine Entität +- Aliase erfassen: Alle Namensvarianten einer Entität als aliases[] +- mention_count: Wie oft wird die Entität insgesamt in allen Artikeln erwähnt? +- Beschreibung: Kurze Einordnung, wer/was die Entität ist (1 Satz) +- KEINE Duplikate: Gleiche Entitäten zusammenfassen + +ENTITY-TYPEN: +- "person": Individuelle Personen (Politiker, Militärs, Journalisten etc.) +- "organisation": Organisationen, Parteien, Behörden, Unternehmen, NGOs +- "location": Länder, Städte, Regionen, Gebiete +- "event": Konkrete Ereignisse (Wahlen, Anschläge, Konferenzen etc.) +- "military": Militärische Einheiten, Waffensysteme, Operationen + +AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: +{{ + "entities": [ + {{ + "name": "Vollständiger Name", + "name_normalized": "vollständiger name", + "type": "person|organisation|location|event|military", + "description": "Kurze Einordnung (1 Satz)", + "aliases": ["Alias1", "Alias2"], + "mention_count": 5 + }} + ] +}}""" + + +RELATIONSHIP_BATCH_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System. +AUFGABE: Analysiere die Beziehungen zwischen den Entitäten basierend auf den Artikeln. + +BEKANNTE ENTITÄTEN (aus dem Gesamtdatensatz): +{entities_json} + +ARTIKEL: +{articles_text} + +AUFGABE: +Identifiziere ALLE Beziehungen zwischen den oben genannten Entitäten, die sich aus den Artikeln ergeben. +- Nur Beziehungen nennen, die im Artikeltext belegt sind +- source und target: Exakt die Namen aus der Entitäten-Liste verwenden +- Wenn eine Entität im Artikel vorkommt aber nicht in der Liste, verwende den Namen wie er in der Liste steht + +BEZIEHUNGS-KATEGORIEN: +- "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft +- "conflict": Konflikt, Krieg, Feindschaft, Sanktionen, Opposition +- "diplomacy": Diplomatische Beziehungen, Verhandlungen, Abkommen +- "economic": Wirtschaftsbeziehungen, Handel, Investitionen +- "legal": Rechtliche Beziehungen, Klagen, Verurteilungen +- "neutral": Sonstige Beziehung, Erwähnung, Verbindung + +REGELN: +- weight: 1 (schwach/indirekt) bis 5 (stark/direkt) +- evidence[]: 1-2 kurze Stichpunkte aus dem Artikeltext als Beleg +- status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd) + +AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: +{{ + "relations": [ + {{ + "source": "Entität A", + "target": "Entität B", + "category": "alliance|conflict|diplomacy|economic|legal|neutral", + "label": "Kurzes Label (2-4 Wörter)", + "description": "Beschreibung (1-2 Sätze)", + "weight": 3, + "status": "active|historical|emerging", + "evidence": ["Beleg 1", "Beleg 2"] + }} + ] +}}""" + + +CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler. + +ENTITÄTEN: +{entities_json} + +AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück: +1. "name_fix": Name ist falsch geschrieben oder unvollständig +2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location) +3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken) + +WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur. + +AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: +{{ + "corrections": [ + {{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}}, + {{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}}, + {{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}} + ] +}}""" + + +# --------------------------------------------------------------------------- +# Hilfsfunktionen +# --------------------------------------------------------------------------- + +async def _broadcast(ws_manager, msg_type: str, data: dict): + """Sendet eine WebSocket-Nachricht, falls ws_manager vorhanden.""" + if ws_manager: + try: + await ws_manager.broadcast({"type": msg_type, **data}) + except Exception: + pass + + +def _parse_json_response(text: str) -> Optional[dict]: + """Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences.""" + if not text: + return None + try: + return json.loads(text) + except json.JSONDecodeError: + pass + fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL) + if fence_match: + try: + return json.loads(fence_match.group(1)) + except json.JSONDecodeError: + pass + obj_match = re.search(r'\{.*\}', text, re.DOTALL) + if obj_match: + try: + return json.loads(obj_match.group()) + except json.JSONDecodeError: + pass + logger.warning("JSON-Parse fehlgeschlagen") + return None + + +async def _check_analysis_exists(db, analysis_id: int) -> bool: + """Prüft ob die Analyse noch existiert.""" + cursor = await db.execute( + "SELECT id FROM network_analyses WHERE id = ?", (analysis_id,) + ) + return await cursor.fetchone() is not None + + +def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> str: + """SHA256-Hash über sortierte IDs und Timestamps.""" + parts = [] + for aid, ats in sorted(zip(article_ids, article_ts)): + parts.append(f"a:{aid}:{ats}") + for fid, fts in sorted(zip(factcheck_ids, factcheck_ts)): + parts.append(f"f:{fid}:{fts}") + return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() + + +# --------------------------------------------------------------------------- +# Phase 1: Entity-Extraktion (Haiku) +# --------------------------------------------------------------------------- + +async def _phase1_extract_entities( + db, analysis_id: int, tenant_id: int, + articles: list[dict], factchecks: list[dict], + usage_acc: UsageAccumulator, ws_manager=None, +) -> list[dict]: + """Extrahiert Entitäten aus Artikeln via Haiku in Batches.""" + logger.info(f"Phase 1: {len(articles)} Artikel, {len(factchecks)} Faktenchecks") + + all_texts = [] + for art in articles: + headline = art.get("headline_de") or art.get("headline") or "" + content = art.get("content_de") or art.get("content_original") or "" + source = art.get("source") or "" + if len(content) > 2000: + content = content[:2000] + "..." + all_texts.append(f"[{source}] {headline}\n{content}") + + for fc in factchecks: + claim = fc.get("claim") or "" + evidence = fc.get("evidence") or "" + status = fc.get("status") or "" + all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") + + if not all_texts: + logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden") + return [] + + batch_size = 30 + batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] + logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") + + entity_map: dict[tuple[str, str], dict] = {} + + for batch_idx, batch in enumerate(batches): + articles_text = "\n\n---\n\n".join(batch) + prompt = ENTITY_EXTRACTION_PROMPT.format(articles_text=articles_text) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + usage_acc.add(usage) + except Exception as e: + logger.error(f"Haiku Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") + continue + + parsed = _parse_json_response(result_text) + if not parsed or "entities" not in parsed: + logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") + continue + + entities = parsed["entities"] + if not isinstance(entities, list): + continue + + for ent in entities: + if not isinstance(ent, dict): + continue + name = (ent.get("name") or "").strip() + if not name: + continue + + name_normalized = (ent.get("name_normalized") or name.lower()).strip().lower() + entity_type = (ent.get("type") or "organisation").lower().strip() + valid_types = {"person", "organisation", "location", "event", "military"} + if entity_type not in valid_types: + entity_type = "organisation" + + key = (name_normalized, entity_type) + + if key in entity_map: + existing = entity_map[key] + aliases = set(existing.get("aliases", [])) + for alias in ent.get("aliases", []): + if alias and alias.strip(): + aliases.add(alias.strip()) + if name != existing["name"]: + aliases.add(name) + existing["aliases"] = list(aliases) + existing["mention_count"] = existing.get("mention_count", 1) + ent.get("mention_count", 1) + new_desc = ent.get("description", "") + if len(new_desc) > len(existing.get("description", "")): + existing["description"] = new_desc + else: + entity_map[key] = { + "name": name, + "name_normalized": name_normalized, + "type": entity_type, + "description": ent.get("description", ""), + "aliases": [a.strip() for a in ent.get("aliases", []) if a and a.strip()], + "mention_count": ent.get("mention_count", 1), + } + + logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {len(entity_map)} Entitäten gesamt") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "entity_extraction", + "progress": int((batch_idx + 1) / len(batches) * 100), + }) + + all_entities = list(entity_map.values()) + + for ent in all_entities: + try: + cursor = await db.execute( + """INSERT OR IGNORE INTO network_entities + (network_analysis_id, name, name_normalized, entity_type, + description, aliases, mention_count, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + ( + analysis_id, ent["name"], ent["name_normalized"], ent["type"], + ent.get("description", ""), + json.dumps(ent.get("aliases", []), ensure_ascii=False), + ent.get("mention_count", 1), + tenant_id, + ), + ) + ent["db_id"] = cursor.lastrowid + except Exception as e: + logger.warning(f"Entity speichern fehlgeschlagen '{ent['name']}': {e}") + + await db.commit() + logger.info(f"Phase 1 abgeschlossen: {len(all_entities)} Entitäten gespeichert") + return all_entities + + +# --------------------------------------------------------------------------- +# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch) +# --------------------------------------------------------------------------- + +def _build_entity_name_map(entities: list[dict]) -> dict[str, int]: + """Mapping: normalisierter Name/Alias -> DB-ID.""" + name_to_id: dict[str, int] = {} + for ent in entities: + db_id = ent.get("db_id") + if not db_id: + continue + name_to_id[ent["name"].lower()] = db_id + name_to_id[ent["name_normalized"]] = db_id + for alias in ent.get("aliases", []): + if alias and alias.strip(): + name_to_id[alias.strip().lower()] = db_id + return name_to_id + + +def _find_relevant_entities(batch_texts: list[str], entities: list[dict]) -> list[dict]: + """Findet Entitäten, die in den Batch-Texten vorkommen.""" + combined_text = " ".join(batch_texts).lower() + relevant = [] + for ent in entities: + if ent["name"].lower() in combined_text or ent["name_normalized"] in combined_text: + relevant.append(ent) + continue + for alias in ent.get("aliases", []): + if alias and alias.strip().lower() in combined_text: + relevant.append(ent) + break + return relevant + + +async def _phase2_analyze_relationships( + db, analysis_id: int, tenant_id: int, + entities: list[dict], articles: list[dict], factchecks: list[dict], + usage_acc: UsageAccumulator, ws_manager=None, +) -> list[dict]: + """Analysiert Beziehungen batch-weise und merged die Ergebnisse.""" + if not entities: + return [] + + logger.info(f"Phase 2: {len(entities)} Entitäten, batched Beziehungsanalyse") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 0, + }) + + # --- Texte vorbereiten (gleiche Logik wie Phase 1) --- + all_texts = [] + for art in articles: + headline = art.get("headline_de") or art.get("headline") or "" + content = art.get("content_de") or art.get("content_original") or "" + source = art.get("source") or "" + if len(content) > 2000: + content = content[:2000] + "..." + all_texts.append(f"[{source}] {headline}\n{content}") + + for fc in factchecks: + claim = fc.get("claim") or "" + evidence = fc.get("evidence") or "" + status = fc.get("status") or "" + all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") + + if not all_texts: + return [] + + # --- Stufe A: Per-Batch Beziehungsextraktion --- + batch_size = 30 + batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] + logger.info(f"Stufe A: {len(batches)} Batches für Beziehungsextraktion") + + all_raw_relations: list[dict] = [] + name_to_id = _build_entity_name_map(entities) + + for batch_idx, batch in enumerate(batches): + relevant = _find_relevant_entities(batch, entities) + if len(relevant) < 2: + logger.debug(f"Batch {batch_idx + 1}: Weniger als 2 Entitäten, überspringe") + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": int((batch_idx + 1) / len(batches) * 70), + }) + continue + + entities_for_prompt = [ + {"name": e["name"], "type": e["type"]} + for e in relevant + ] + entities_json = json.dumps(entities_for_prompt, ensure_ascii=False) + articles_text = "\n\n---\n\n".join(batch) + + prompt = RELATIONSHIP_BATCH_PROMPT.format( + entities_json=entities_json, + articles_text=articles_text, + ) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + usage_acc.add(usage) + except Exception as e: + logger.error(f"Relationship Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") + continue + + parsed = _parse_json_response(result_text) + if not parsed: + logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") + continue + + relations = parsed.get("relations", []) + if not isinstance(relations, list): + continue + + batch_count = 0 + for rel in relations: + if not isinstance(rel, dict): + continue + source_name = (rel.get("source") or "").strip() + target_name = (rel.get("target") or "").strip() + if not source_name or not target_name: + continue + rel["_batch"] = batch_idx + all_raw_relations.append(rel) + batch_count += 1 + + logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {batch_count} Beziehungen, {len(relevant)} Entitäten") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": int((batch_idx + 1) / len(batches) * 70), + }) + + logger.info(f"Stufe A abgeschlossen: {len(all_raw_relations)} rohe Beziehungen aus {len(batches)} Batches") + + # --- Stufe B: Merge + Deduplizierung --- + logger.info("Stufe B: Merge und Deduplizierung") + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 75, + }) + + valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"} + merged: dict[tuple[int, int, str], dict] = {} + + for rel in all_raw_relations: + source_name = (rel.get("source") or "").strip() + target_name = (rel.get("target") or "").strip() + + source_id = name_to_id.get(source_name.lower()) + target_id = name_to_id.get(target_name.lower()) + if not source_id or not target_id or source_id == target_id: + continue + + # Normalisiere Richtung um A->B und B->A zu mergen + if source_id > target_id: + source_id, target_id = target_id, source_id + source_name, target_name = target_name, source_name + + category = (rel.get("category") or "neutral").lower().strip() + if category not in valid_categories: + category = "neutral" + + key = (source_id, target_id, category) + + weight = rel.get("weight", 3) + try: + weight = max(1, min(5, int(weight))) + except (ValueError, TypeError): + weight = 3 + + status = (rel.get("status") or "active").lower().strip() + if status not in {"active", "historical", "emerging"}: + status = "active" + + evidence = rel.get("evidence", []) + if not isinstance(evidence, list): + evidence = [] + + if key in merged: + existing = merged[key] + existing["weight"] = max(existing["weight"], weight) + existing_evidence = set(existing["evidence"]) + for ev in evidence: + if isinstance(ev, str) and ev.strip() and ev.strip() not in existing_evidence: + existing["evidence"].append(ev.strip()) + existing_evidence.add(ev.strip()) + if len(existing["evidence"]) > 10: + existing["evidence"] = existing["evidence"][:10] + status_priority = {"active": 3, "emerging": 2, "historical": 1} + if status_priority.get(status, 0) > status_priority.get(existing["status"], 0): + existing["status"] = status + new_desc = rel.get("description", "") + if len(new_desc) > len(existing.get("description", "")): + existing["description"] = new_desc + existing["label"] = rel.get("label", existing["label"]) + existing["_count"] = existing.get("_count", 1) + 1 + else: + merged[key] = { + "source_id": source_id, + "target_id": target_id, + "source_name": source_name, + "target_name": target_name, + "category": category, + "label": rel.get("label", ""), + "description": rel.get("description", ""), + "weight": weight, + "status": status, + "evidence": [ev.strip() for ev in evidence if isinstance(ev, str) and ev.strip()][:10], + "_count": 1, + } + + logger.info(f"Stufe B abgeschlossen: {len(all_raw_relations)} roh -> {len(merged)} gemerged") + + # Gewichts-Boost für mehrfach belegte Beziehungen + for m in merged.values(): + if m["_count"] >= 3 and m["weight"] < 5: + m["weight"] = min(5, m["weight"] + 1) + + # --- In DB speichern --- + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 85, + }) + + saved_relations = [] + for m in merged.values(): + try: + cursor = await db.execute( + """INSERT INTO network_relations + (network_analysis_id, source_entity_id, target_entity_id, + category, label, description, weight, status, evidence, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + analysis_id, m["source_id"], m["target_id"], m["category"], + m["label"], m["description"], m["weight"], m["status"], + json.dumps(m["evidence"], ensure_ascii=False), + tenant_id, + ), + ) + saved_relations.append({"id": cursor.lastrowid}) + except Exception as e: + logger.warning(f"Beziehung speichern fehlgeschlagen: {e}") + + await db.commit() + logger.info(f"Phase 2 abgeschlossen: {len(saved_relations)} Beziehungen gespeichert") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 100, + }) + + return saved_relations + + +# --------------------------------------------------------------------------- +# Phase 2b: Korrekturen (Opus) +# --------------------------------------------------------------------------- + +async def _phase2b_corrections( + db, analysis_id: int, tenant_id: int, + entities: list[dict], relation_count: int, + usage_acc: UsageAccumulator, ws_manager=None, +) -> None: + """Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add).""" + if len(entities) < 10: + return + + logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "correction", + "progress": 0, + }) + + entities_for_prompt = [] + for e in entities: + entities_for_prompt.append({ + "name": e["name"], + "type": e["type"], + "description": e.get("description", ""), + "aliases": e.get("aliases", []), + }) + + batch_size = 500 + entity_batches = [entities_for_prompt[i:i + batch_size] + for i in range(0, len(entities_for_prompt), batch_size)] + + all_corrections = [] + for bi, eb in enumerate(entity_batches): + entities_json = json.dumps(eb, ensure_ascii=False, indent=1) + prompt = CORRECTION_PROMPT.format(entities_json=entities_json) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=None) + usage_acc.add(usage) + except Exception as e: + logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}") + continue + + parsed = _parse_json_response(result_text) + if not parsed: + continue + + corrections = parsed.get("corrections", []) + if isinstance(corrections, list): + all_corrections.extend(corrections) + + logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen") + + if all_corrections: + await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections) + logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "correction", + "progress": 100, + }) + + +async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections): + """Wendet Opus-Korrekturen auf Entitäten an.""" + for corr in corrections: + if not isinstance(corr, dict): + continue + corr_type = corr.get("type", "") + + try: + if corr_type == "name_fix": + entity_name = (corr.get("entity_name") or "").strip() + corrected_name = (corr.get("corrected_name") or "").strip() + if not entity_name or not corrected_name: + continue + + for ent in entities: + if ent["name"].lower() == entity_name.lower() or \ + ent["name_normalized"] == entity_name.lower(): + old_name = ent["name"] + ent["name"] = corrected_name + ent["name_normalized"] = corrected_name.lower().strip() + ent.setdefault("aliases", []) + if old_name not in ent["aliases"]: + ent["aliases"].append(old_name) + + if ent.get("db_id"): + await db.execute( + """UPDATE network_entities + SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1 + WHERE id = ?""", + (corrected_name, corrected_name.lower().strip(), + json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]), + ) + break + + elif corr_type == "merge": + keep_name = (corr.get("entity_name") or "").strip() + merge_name = (corr.get("merge_with") or "").strip() + if not keep_name or not merge_name: + continue + + keep_ent = merge_ent = None + for ent in entities: + nl = ent["name"].lower() + nn = ent["name_normalized"] + if nl == keep_name.lower() or nn == keep_name.lower(): + keep_ent = ent + elif nl == merge_name.lower() or nn == merge_name.lower(): + merge_ent = ent + + if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"): + aliases = set(keep_ent.get("aliases", [])) + aliases.add(merge_ent["name"]) + for a in merge_ent.get("aliases", []): + if a and a.strip(): + aliases.add(a.strip()) + keep_ent["aliases"] = list(aliases) + keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) + + await db.execute( + "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", + (keep_ent["db_id"], merge_ent["db_id"]), + ) + await db.execute( + "UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?", + (keep_ent["db_id"], merge_ent["db_id"], analysis_id), + ) + await db.execute( + "UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?", + (keep_ent["db_id"], merge_ent["db_id"], analysis_id), + ) + await db.execute( + """UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1 + WHERE id = ?""", + (json.dumps(keep_ent["aliases"], ensure_ascii=False), + keep_ent["mention_count"], keep_ent["db_id"]), + ) + await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],)) + entities.remove(merge_ent) + + elif corr_type == "add": + entity_name = (corr.get("entity_name") or "").strip() + entity_type = (corr.get("entity_type") or "organisation").lower().strip() + description = corr.get("description", "") + if not entity_name: + continue + + valid_types = {"person", "organisation", "location", "event", "military"} + if entity_type not in valid_types: + entity_type = "organisation" + + name_norm = entity_name.lower().strip() + if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities): + continue + + cursor = await db.execute( + """INSERT OR IGNORE INTO network_entities + (network_analysis_id, name, name_normalized, entity_type, + description, aliases, mention_count, corrected_by_opus, tenant_id) + VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""", + (analysis_id, entity_name, name_norm, entity_type, description, tenant_id), + ) + if cursor.lastrowid: + entities.append({ + "name": entity_name, "name_normalized": name_norm, + "type": entity_type, "description": description, + "aliases": [], "mention_count": 1, "db_id": cursor.lastrowid, + }) + + except Exception as e: + logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}") + + await db.commit() + + +# --------------------------------------------------------------------------- +# Phase 3: Finalisierung +# --------------------------------------------------------------------------- + +async def _phase3_finalize( + db, analysis_id, tenant_id, entity_count, relation_count, + article_ids, factcheck_ids, article_ts, factcheck_ts, + usage_acc, ws_manager=None, +): + """Finalisiert: Zähler, Hash, Log, Status.""" + data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) + now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") + + await db.execute( + """UPDATE network_analyses + SET entity_count = ?, relation_count = ?, status = 'ready', + last_generated_at = ?, data_hash = ? + WHERE id = ?""", + (entity_count, relation_count, now, data_hash, analysis_id), + ) + + await db.execute( + """INSERT INTO network_generation_log + (network_analysis_id, completed_at, status, input_tokens, output_tokens, + cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, + entity_count, relation_count, tenant_id) + VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, + usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, + usage_acc.total_cost_usd, usage_acc.call_count, + entity_count, relation_count, tenant_id), + ) + + await db.commit() + + logger.info(f"Analyse {analysis_id} finalisiert: {entity_count} Entitäten, " + f"{relation_count} Beziehungen, ${usage_acc.total_cost_usd:.4f}") + + await _broadcast(ws_manager, "network_complete", { + "analysis_id": analysis_id, + "entity_count": entity_count, + "relation_count": relation_count, + "cost_usd": round(usage_acc.total_cost_usd, 4), + }) + + +# --------------------------------------------------------------------------- +# Hauptfunktion +# --------------------------------------------------------------------------- + +async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): + """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. + + Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches) + Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung + Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add) + Phase 3: Finalisierung (Zähler, Hash, Log) + """ + from database import get_db + + db = await get_db() + usage_acc = UsageAccumulator() + + try: + if not await _check_analysis_exists(db, analysis_id): + logger.warning(f"Analyse {analysis_id} existiert nicht") + return + + await db.execute( + "UPDATE network_analyses SET status = 'generating' WHERE id = ?", + (analysis_id,), + ) + await db.commit() + + # Incident-IDs laden + cursor = await db.execute( + "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", + (analysis_id,), + ) + incident_ids = [row["incident_id"] for row in await cursor.fetchall()] + + if not incident_ids: + logger.warning(f"Analyse {analysis_id}: Keine Lagen verknüpft") + await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) + await db.commit() + await _broadcast(ws_manager, "network_error", { + "analysis_id": analysis_id, "error": "Keine Lagen verknüpft", + }) + return + + # Artikel laden + placeholders = ",".join("?" * len(incident_ids)) + cursor = await db.execute( + f"""SELECT id, incident_id, headline, headline_de, source, source_url, + content_original, content_de, collected_at + FROM articles WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + article_rows = await cursor.fetchall() + articles = [] + article_ids = [] + article_ts = [] + for r in article_rows: + articles.append({ + "id": r["id"], "incident_id": r["incident_id"], + "headline": r["headline"], "headline_de": r["headline_de"], + "source": r["source"], "source_url": r["source_url"], + "content_original": r["content_original"], "content_de": r["content_de"], + }) + article_ids.append(r["id"]) + article_ts.append(r["collected_at"] or "") + + # Faktenchecks laden + cursor = await db.execute( + f"""SELECT id, incident_id, claim, status, evidence, checked_at + FROM fact_checks WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + fc_rows = await cursor.fetchall() + factchecks = [] + factcheck_ids = [] + factcheck_ts = [] + for r in fc_rows: + factchecks.append({ + "id": r["id"], "incident_id": r["incident_id"], + "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], + }) + factcheck_ids.append(r["id"]) + factcheck_ts.append(r["checked_at"] or "") + + logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " + f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") + + # Phase 1 + if not await _check_analysis_exists(db, analysis_id): + return + + entities = await _phase1_extract_entities( + db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, + ) + + # Phase 2 + if not await _check_analysis_exists(db, analysis_id): + return + + relations = await _phase2_analyze_relationships( + db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, + ) + + # Phase 2b: Korrekturen + if not await _check_analysis_exists(db, analysis_id): + return + + await _phase2b_corrections( + db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager, + ) + + # Entity-Count nach Korrekturen aktualisieren + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", + (analysis_id,), + ) + row = await cursor.fetchone() + final_entity_count = row["cnt"] if row else len(entities) + + # Phase 3 + if not await _check_analysis_exists(db, analysis_id): + return + + await _phase3_finalize( + db, analysis_id, tenant_id, + entity_count=final_entity_count, relation_count=len(relations), + article_ids=article_ids, factcheck_ids=factcheck_ids, + article_ts=article_ts, factcheck_ts=factcheck_ts, + usage_acc=usage_acc, ws_manager=ws_manager, + ) + + except Exception as e: + logger.error(f"Entity-Extraktion fehlgeschlagen (Analyse {analysis_id}): {e}", exc_info=True) + try: + await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) + await db.commit() + except Exception: + pass + await _broadcast(ws_manager, "network_error", { + "analysis_id": analysis_id, "error": str(e), + }) + finally: + await db.close()