"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Batched).""" import asyncio import hashlib import json import logging import re from collections import defaultdict from datetime import datetime from typing import Optional from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator from config import CLAUDE_MODEL_FAST, TIMEZONE logger = logging.getLogger("osint.entity_extractor") # --------------------------------------------------------------------------- # Konstanten # --------------------------------------------------------------------------- TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1} _STOP_WORDS = frozenset({ "the", "of", "and", "for", "in", "on", "at", "to", "by", "von", "der", "die", "das", "und", "für", "des", "den", "dem", "ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach", }) # --------------------------------------------------------------------------- # Prompts # --------------------------------------------------------------------------- ENTITY_EXTRACTION_PROMPT = """Du bist ein OSINT-Analyst für ein Lagemonitoring-System. AUFGABE: Extrahiere ALLE relevanten Entitäten aus den folgenden Nachrichtenartikeln. ARTIKEL: {articles_text} REGELN: - Extrahiere JEDE genannte Person, Organisation, Ort, Ereignis und militärische Einheit - Normalisiere Namen: "Wladimir Putin", "Putin", "V. Putin" -> eine Entität - Aliase erfassen: Alle Namensvarianten einer Entität als aliases[] - mention_count: Wie oft wird die Entität insgesamt in allen Artikeln erwähnt? - Beschreibung: Kurze Einordnung, wer/was die Entität ist (1 Satz) - KEINE Duplikate: Gleiche Entitäten zusammenfassen ENTITY-TYPEN: - "person": Individuelle Personen (Politiker, Militärs, Journalisten etc.) - "organisation": Organisationen, Parteien, Behörden, Unternehmen, NGOs - "location": Länder, Städte, Regionen, Gebiete - "event": Konkrete Ereignisse (Wahlen, Anschläge, Konferenzen etc.) - "military": Militärische Einheiten, Waffensysteme, Operationen AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: {{ "entities": [ {{ "name": "Vollständiger Name", "name_normalized": "vollständiger name", "type": "person|organisation|location|event|military", "description": "Kurze Einordnung (1 Satz)", "aliases": ["Alias1", "Alias2"], "mention_count": 5 }} ] }}""" RELATIONSHIP_BATCH_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System. AUFGABE: Analysiere die Beziehungen zwischen den Entitäten basierend auf den Artikeln. BEKANNTE ENTITÄTEN (aus dem Gesamtdatensatz): {entities_json} ARTIKEL: {articles_text} AUFGABE: Identifiziere ALLE Beziehungen zwischen den oben genannten Entitäten, die sich aus den Artikeln ergeben. - Nur Beziehungen nennen, die im Artikeltext belegt sind - source und target: Exakt die Namen aus der Entitäten-Liste verwenden - Wenn eine Entität im Artikel vorkommt aber nicht in der Liste, verwende den Namen wie er in der Liste steht BEZIEHUNGS-KATEGORIEN: - "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft - "conflict": Konflikt, Krieg, Feindschaft, Sanktionen, Opposition - "diplomacy": Diplomatische Beziehungen, Verhandlungen, Abkommen - "economic": Wirtschaftsbeziehungen, Handel, Investitionen - "legal": Rechtliche Beziehungen, Klagen, Verurteilungen - "neutral": Sonstige Beziehung, Erwähnung, Verbindung REGELN: - weight: 1 (schwach/indirekt) bis 5 (stark/direkt) - evidence[]: 1-2 kurze Stichpunkte aus dem Artikeltext als Beleg - status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd) AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: {{ "relations": [ {{ "source": "Entität A", "target": "Entität B", "category": "alliance|conflict|diplomacy|economic|legal|neutral", "label": "Kurzes Label (2-4 Wörter)", "description": "Beschreibung (1-2 Sätze)", "weight": 3, "status": "active|historical|emerging", "evidence": ["Beleg 1", "Beleg 2"] }} ] }}""" SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate. ENTITÄTEN: {entity_list} AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt? Typische Duplikate: - Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps" - Sprachvarianten: "European Union" = "Europäische Union" - Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus" REGELN: - NUR echte Duplikate zusammenführen (gleiche reale Entität) - NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen - "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name) - "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: {{ "merges": [ {{"keep": 1, "merge": [3, 5]}}, {{"keep": 2, "merge": [4]}} ] }} Falls KEINE Duplikate: {{"merges": []}}""" # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- async def _broadcast(ws_manager, msg_type: str, data: dict): """Sendet eine WebSocket-Nachricht, falls ws_manager vorhanden.""" if ws_manager: try: await ws_manager.broadcast({"type": msg_type, **data}) except Exception: pass def _parse_json_response(text: str) -> Optional[dict]: """Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences.""" if not text: return None try: return json.loads(text) except json.JSONDecodeError: pass fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL) if fence_match: try: return json.loads(fence_match.group(1)) except json.JSONDecodeError: pass obj_match = re.search(r'\{.*\}', text, re.DOTALL) if obj_match: try: return json.loads(obj_match.group()) except json.JSONDecodeError: pass logger.warning("JSON-Parse fehlgeschlagen") return None async def _check_analysis_exists(db, analysis_id: int) -> bool: """Prüft ob die Analyse noch existiert.""" cursor = await db.execute( "SELECT id FROM network_analyses WHERE id = ?", (analysis_id,) ) return await cursor.fetchone() is not None def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> str: """SHA256-Hash über sortierte IDs und Timestamps.""" parts = [] for aid, ats in sorted(zip(article_ids, article_ts)): parts.append(f"a:{aid}:{ats}") for fid, fts in sorted(zip(factcheck_ids, factcheck_ts)): parts.append(f"f:{fid}:{fts}") return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() # --------------------------------------------------------------------------- # Entity-Merge Helper # --------------------------------------------------------------------------- async def _merge_entity_in_db( db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict], ): """Führt merge_ent in keep_ent zusammen (DB + In-Memory).""" keep_id = keep_ent["db_id"] merge_id = merge_ent["db_id"] # Aliases vereinen aliases = set(keep_ent.get("aliases", [])) aliases.add(merge_ent["name"]) for a in merge_ent.get("aliases", []): if a and a.strip(): aliases.add(a.strip()) aliases.discard(keep_ent["name"]) keep_ent["aliases"] = list(aliases) # Mention count addieren keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) # Längere Description behalten if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")): keep_ent["description"] = merge_ent["description"] # Entity-Mentions umhängen await db.execute( "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", (keep_id, merge_id), ) # Relations umhängen await db.execute( "UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?", (keep_id, merge_id, analysis_id), ) await db.execute( "UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?", (keep_id, merge_id, analysis_id), ) # Self-Loops entfernen die durch den Merge entstanden sein könnten await db.execute( "DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?", (keep_id, keep_id, analysis_id), ) # Keep-Entity in DB aktualisieren await db.execute( """UPDATE network_entities SET aliases = ?, mention_count = ?, description = ? WHERE id = ?""", (json.dumps(keep_ent["aliases"], ensure_ascii=False), keep_ent["mention_count"], keep_ent.get("description", ""), keep_id), ) # Merge-Entity löschen await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,)) # Aus entities-Liste entfernen try: entities.remove(merge_ent) except ValueError: pass # --------------------------------------------------------------------------- # Phase 1: Entity-Extraktion (Haiku) # --------------------------------------------------------------------------- async def _phase1_extract_entities( db, analysis_id: int, tenant_id: int, articles: list[dict], factchecks: list[dict], usage_acc: UsageAccumulator, ws_manager=None, ) -> list[dict]: """Extrahiert Entitäten aus Artikeln via Haiku in Batches.""" logger.info(f"Phase 1: {len(articles)} Artikel, {len(factchecks)} Faktenchecks") all_texts = [] for art in articles: headline = art.get("headline_de") or art.get("headline") or "" content = art.get("content_de") or art.get("content_original") or "" source = art.get("source") or "" if len(content) > 2000: content = content[:2000] + "..." all_texts.append(f"[{source}] {headline}\n{content}") for fc in factchecks: claim = fc.get("claim") or "" evidence = fc.get("evidence") or "" status = fc.get("status") or "" all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") if not all_texts: logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden") return [] batch_size = 30 batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") entity_map: dict[str, dict] = {} for batch_idx, batch in enumerate(batches): articles_text = "\n\n---\n\n".join(batch) prompt = ENTITY_EXTRACTION_PROMPT.format(articles_text=articles_text) try: result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) usage_acc.add(usage) except Exception as e: logger.error(f"Haiku Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") continue parsed = _parse_json_response(result_text) if not parsed or "entities" not in parsed: logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") continue entities = parsed["entities"] if not isinstance(entities, list): continue for ent in entities: if not isinstance(ent, dict): continue name = (ent.get("name") or "").strip() if not name: continue name_normalized = (ent.get("name_normalized") or name.lower()).strip().lower() entity_type = (ent.get("type") or "organisation").lower().strip() valid_types = {"person", "organisation", "location", "event", "military"} if entity_type not in valid_types: entity_type = "organisation" key = name_normalized if key in entity_map: existing = entity_map[key] aliases = set(existing.get("aliases", [])) for alias in ent.get("aliases", []): if alias and alias.strip(): aliases.add(alias.strip()) if name != existing["name"]: aliases.add(name) existing["aliases"] = list(aliases) existing["mention_count"] = existing.get("mention_count", 1) + ent.get("mention_count", 1) new_desc = ent.get("description", "") if len(new_desc) > len(existing.get("description", "")): existing["description"] = new_desc # Typ-Priorität: höherwertigen Typ behalten if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0): existing["type"] = entity_type else: entity_map[key] = { "name": name, "name_normalized": name_normalized, "type": entity_type, "description": ent.get("description", ""), "aliases": [a.strip() for a in ent.get("aliases", []) if a and a.strip()], "mention_count": ent.get("mention_count", 1), } logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {len(entity_map)} Entitäten gesamt") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "entity_extraction", "progress": int((batch_idx + 1) / len(batches) * 100), }) all_entities = list(entity_map.values()) for ent in all_entities: try: cursor = await db.execute( """INSERT OR IGNORE INTO network_entities (network_analysis_id, name, name_normalized, entity_type, description, aliases, mention_count, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( analysis_id, ent["name"], ent["name_normalized"], ent["type"], ent.get("description", ""), json.dumps(ent.get("aliases", []), ensure_ascii=False), ent.get("mention_count", 1), tenant_id, ), ) ent["db_id"] = cursor.lastrowid except Exception as e: logger.warning(f"Entity speichern fehlgeschlagen '{ent['name']}': {e}") await db.commit() logger.info(f"Phase 1 abgeschlossen: {len(all_entities)} Entitäten gespeichert") return all_entities # --------------------------------------------------------------------------- # Phase 2a: Entity-Deduplication nach name_normalized # --------------------------------------------------------------------------- async def _phase2a_deduplicate_entities( db, analysis_id: int, entities: list[dict], ws_manager=None, ) -> None: """Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ).""" logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate") groups = defaultdict(list) for ent in entities: if ent.get("db_id"): groups[ent["name_normalized"]].append(ent) merge_count = 0 merged_ids = set() for nn, group in groups.items(): if len(group) < 2: continue # Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen group.sort( key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)), reverse=True, ) keep = group[0] for merge in group[1:]: if merge["db_id"] in merged_ids: continue await _merge_entity_in_db(db, analysis_id, keep, merge, entities) merged_ids.add(merge["db_id"]) merge_count += 1 if merge_count > 0: await db.commit() logger.info( f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, " f"{len(entities)} Entitäten verbleiben" ) await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "entity_dedup", "progress": 100, }) # --------------------------------------------------------------------------- # Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch) # --------------------------------------------------------------------------- def _build_entity_name_map(entities: list[dict]) -> dict[str, int]: """Mapping: normalisierter Name/Alias -> DB-ID.""" name_to_id: dict[str, int] = {} for ent in entities: db_id = ent.get("db_id") if not db_id: continue name_to_id[ent["name"].lower()] = db_id name_to_id[ent["name_normalized"]] = db_id for alias in ent.get("aliases", []): if alias and alias.strip(): name_to_id[alias.strip().lower()] = db_id return name_to_id def _find_relevant_entities(batch_texts: list[str], entities: list[dict]) -> list[dict]: """Findet Entitäten, die in den Batch-Texten vorkommen.""" combined_text = " ".join(batch_texts).lower() relevant = [] for ent in entities: if ent["name"].lower() in combined_text or ent["name_normalized"] in combined_text: relevant.append(ent) continue for alias in ent.get("aliases", []): if alias and alias.strip().lower() in combined_text: relevant.append(ent) break return relevant async def _phase2_analyze_relationships( db, analysis_id: int, tenant_id: int, entities: list[dict], articles: list[dict], factchecks: list[dict], usage_acc: UsageAccumulator, ws_manager=None, ) -> list[dict]: """Analysiert Beziehungen batch-weise und merged die Ergebnisse.""" if not entities: return [] logger.info(f"Phase 2: {len(entities)} Entitäten, batched Beziehungsanalyse") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": 0, }) # --- Texte vorbereiten (gleiche Logik wie Phase 1) --- all_texts = [] for art in articles: headline = art.get("headline_de") or art.get("headline") or "" content = art.get("content_de") or art.get("content_original") or "" source = art.get("source") or "" if len(content) > 2000: content = content[:2000] + "..." all_texts.append(f"[{source}] {headline}\n{content}") for fc in factchecks: claim = fc.get("claim") or "" evidence = fc.get("evidence") or "" status = fc.get("status") or "" all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") if not all_texts: return [] # --- Stufe A: Per-Batch Beziehungsextraktion --- batch_size = 30 batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] logger.info(f"Stufe A: {len(batches)} Batches für Beziehungsextraktion") all_raw_relations: list[dict] = [] name_to_id = _build_entity_name_map(entities) for batch_idx, batch in enumerate(batches): relevant = _find_relevant_entities(batch, entities) if len(relevant) < 2: logger.debug(f"Batch {batch_idx + 1}: Weniger als 2 Entitäten, überspringe") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": int((batch_idx + 1) / len(batches) * 70), }) continue entities_for_prompt = [ {"name": e["name"], "type": e["type"]} for e in relevant ] entities_json = json.dumps(entities_for_prompt, ensure_ascii=False) articles_text = "\n\n---\n\n".join(batch) prompt = RELATIONSHIP_BATCH_PROMPT.format( entities_json=entities_json, articles_text=articles_text, ) try: result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) usage_acc.add(usage) except Exception as e: logger.error(f"Relationship Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") continue parsed = _parse_json_response(result_text) if not parsed: logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") continue relations = parsed.get("relations", []) if not isinstance(relations, list): continue batch_count = 0 for rel in relations: if not isinstance(rel, dict): continue source_name = (rel.get("source") or "").strip() target_name = (rel.get("target") or "").strip() if not source_name or not target_name: continue rel["_batch"] = batch_idx all_raw_relations.append(rel) batch_count += 1 logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {batch_count} Beziehungen, {len(relevant)} Entitäten") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": int((batch_idx + 1) / len(batches) * 70), }) logger.info(f"Stufe A abgeschlossen: {len(all_raw_relations)} rohe Beziehungen aus {len(batches)} Batches") # --- Stufe B: Merge + Deduplizierung --- logger.info("Stufe B: Merge und Deduplizierung") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": 75, }) valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"} merged: dict[tuple[int, int, str], dict] = {} for rel in all_raw_relations: source_name = (rel.get("source") or "").strip() target_name = (rel.get("target") or "").strip() source_id = name_to_id.get(source_name.lower()) target_id = name_to_id.get(target_name.lower()) if not source_id or not target_id or source_id == target_id: continue # Normalisiere Richtung um A->B und B->A zu mergen if source_id > target_id: source_id, target_id = target_id, source_id source_name, target_name = target_name, source_name category = (rel.get("category") or "neutral").lower().strip() if category not in valid_categories: category = "neutral" key = (source_id, target_id, category) weight = rel.get("weight", 3) try: weight = max(1, min(5, int(weight))) except (ValueError, TypeError): weight = 3 status = (rel.get("status") or "active").lower().strip() if status not in {"active", "historical", "emerging"}: status = "active" evidence = rel.get("evidence", []) if not isinstance(evidence, list): evidence = [] if key in merged: existing = merged[key] existing["weight"] = max(existing["weight"], weight) existing_evidence = set(existing["evidence"]) for ev in evidence: if isinstance(ev, str) and ev.strip() and ev.strip() not in existing_evidence: existing["evidence"].append(ev.strip()) existing_evidence.add(ev.strip()) if len(existing["evidence"]) > 10: existing["evidence"] = existing["evidence"][:10] status_priority = {"active": 3, "emerging": 2, "historical": 1} if status_priority.get(status, 0) > status_priority.get(existing["status"], 0): existing["status"] = status new_desc = rel.get("description", "") if len(new_desc) > len(existing.get("description", "")): existing["description"] = new_desc existing["label"] = rel.get("label", existing["label"]) existing["_count"] = existing.get("_count", 1) + 1 else: merged[key] = { "source_id": source_id, "target_id": target_id, "source_name": source_name, "target_name": target_name, "category": category, "label": rel.get("label", ""), "description": rel.get("description", ""), "weight": weight, "status": status, "evidence": [ev.strip() for ev in evidence if isinstance(ev, str) and ev.strip()][:10], "_count": 1, } logger.info(f"Stufe B abgeschlossen: {len(all_raw_relations)} roh -> {len(merged)} gemerged") # Gewichts-Boost für mehrfach belegte Beziehungen for m in merged.values(): if m["_count"] >= 3 and m["weight"] < 5: m["weight"] = min(5, m["weight"] + 1) # --- In DB speichern --- await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": 85, }) saved_relations = [] for m in merged.values(): try: cursor = await db.execute( """INSERT INTO network_relations (network_analysis_id, source_entity_id, target_entity_id, category, label, description, weight, status, evidence, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( analysis_id, m["source_id"], m["target_id"], m["category"], m["label"], m["description"], m["weight"], m["status"], json.dumps(m["evidence"], ensure_ascii=False), tenant_id, ), ) saved_relations.append({"id": cursor.lastrowid}) except Exception as e: logger.warning(f"Beziehung speichern fehlgeschlagen: {e}") await db.commit() logger.info(f"Phase 2 abgeschlossen: {len(saved_relations)} Beziehungen gespeichert") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "relationship_extraction", "progress": 100, }) return saved_relations # --------------------------------------------------------------------------- # Phase 2c: Semantische Deduplication (Opus) # --------------------------------------------------------------------------- async def _phase2c_semantic_dedup( db, analysis_id: int, tenant_id: int, entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None, ) -> None: """Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten.""" if len(entities) < 10: return logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "semantic_dedup", "progress": 0, }) # --- Clustering: Token-basiert + Abbreviation-Matching --- token_to_ids = defaultdict(set) db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")} for ent in entities: db_id = ent.get("db_id") if not db_id: continue all_names = [ent["name_normalized"]] for a in ent.get("aliases", []): if a: all_names.append(a.lower()) # Token-basiertes Clustering for name in all_names: for word in re.split(r'[\s\-_/(),.;:]+', name): word = word.strip() if len(word) >= 3 and word not in _STOP_WORDS: token_to_ids[word].add(db_id) # Abbreviation-Matching all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a] for name in all_display_names: words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2] if len(words) >= 2: abbr = "".join(w[0] for w in words).lower() if len(abbr) >= 2: token_to_ids[f"_abbr_{abbr}"].add(db_id) # Kurze Namen als potenzielle Abkürzungen name_clean = re.sub(r'[^a-zA-Z]', '', name).lower() if 2 <= len(name_clean) <= 6: token_to_ids[f"_abbr_{name_clean}"].add(db_id) # Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities) seen_clusters = set() candidate_clusters = [] for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])): if len(db_ids) < 2 or len(db_ids) > 40: continue key = frozenset(db_ids) if key in seen_clusters: continue seen_clusters.add(key) candidate_clusters.append(list(db_ids)) logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden") # --- Opus-Calls für jeden Cluster --- merged_away = set() total_merges = 0 opus_calls = 0 max_calls = 50 for ci, cluster_ids in enumerate(candidate_clusters): if opus_calls >= max_calls: logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe") break # Bereits gemergte Entities filtern active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map] if len(active_ids) < 2: continue active_ents = [db_id_map[did] for did in active_ids] # Prompt bauen lines = [] for i, ent in enumerate(active_ents, 1): aliases_str = ", ".join((ent.get("aliases") or [])[:5]) line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)" if aliases_str: line += f" [Aliases: {aliases_str}]" lines.append(line) prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines)) try: result_text, usage = await call_claude(prompt, tools=None, model=None) usage_acc.add(usage) opus_calls += 1 except Exception as e: logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}") continue parsed = _parse_json_response(result_text) if not parsed: continue for merge_group in parsed.get("merges", []): keep_idx = merge_group.get("keep") merge_indices = merge_group.get("merge", []) if keep_idx is None or not merge_indices: continue if not isinstance(merge_indices, list): merge_indices = [merge_indices] keep_idx -= 1 # 1-indexed → 0-indexed if keep_idx < 0 or keep_idx >= len(active_ents): continue keep_ent = active_ents[keep_idx] if keep_ent["db_id"] in merged_away: continue for mi in merge_indices: mi -= 1 if mi < 0 or mi >= len(active_ents) or mi == keep_idx: continue merge_ent = active_ents[mi] if merge_ent["db_id"] in merged_away: continue logger.info(f"Semantic Merge: '{merge_ent['name']}' → '{keep_ent['name']}'") await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities) merged_away.add(merge_ent["db_id"]) db_id_map.pop(merge_ent["db_id"], None) total_merges += 1 # Progress progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100 await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "semantic_dedup", "progress": min(progress, 100), }) if total_merges > 0: await db.commit() logger.info( f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, " f"{len(entities)} Entitäten verbleiben" ) # --------------------------------------------------------------------------- # Phase 2d: Cleanup # --------------------------------------------------------------------------- async def _phase2d_cleanup( db, analysis_id: int, entities: list[dict], ws_manager=None, ) -> None: """Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities.""" logger.info("Phase 2d: Cleanup") # 1. Self-Loops entfernen cursor = await db.execute( "DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id", (analysis_id,), ) self_loops = cursor.rowcount # 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein) await db.execute( """UPDATE network_relations SET source_entity_id = target_entity_id, target_entity_id = source_entity_id WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""", (analysis_id,), ) # 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung) cursor = await db.execute( """DELETE FROM network_relations WHERE network_analysis_id = ? AND id NOT IN ( SELECT MIN(id) FROM network_relations WHERE network_analysis_id = ? GROUP BY source_entity_id, target_entity_id, category )""", (analysis_id, analysis_id), ) dup_relations = cursor.rowcount # 4. Verwaiste Entities entfernen (keine Verbindungen) cursor = await db.execute( """DELETE FROM network_entities WHERE network_analysis_id = ? AND id NOT IN ( SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ? UNION SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ? )""", (analysis_id, analysis_id, analysis_id), ) orphans = cursor.rowcount # Entities-Liste aktualisieren remaining_ids = set() cursor = await db.execute( "SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,) ) for row in await cursor.fetchall(): remaining_ids.add(row["id"]) entities[:] = [e for e in entities if e.get("db_id") in remaining_ids] await db.commit() logger.info( f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, " f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben" ) await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, "phase": "cleanup", "progress": 100, }) # --------------------------------------------------------------------------- # Phase 3: Finalisierung # --------------------------------------------------------------------------- async def _phase3_finalize( db, analysis_id, tenant_id, entity_count, relation_count, article_ids, factcheck_ids, article_ts, factcheck_ts, usage_acc, ws_manager=None, ): """Finalisiert: Zähler, Hash, Log, Status.""" data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") await db.execute( """UPDATE network_analyses SET entity_count = ?, relation_count = ?, status = 'ready', last_generated_at = ?, data_hash = ? WHERE id = ?""", (entity_count, relation_count, now, data_hash, analysis_id), ) await db.execute( """INSERT INTO network_generation_log (network_analysis_id, completed_at, status, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, entity_count, relation_count, tenant_id) VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, usage_acc.total_cost_usd, usage_acc.call_count, entity_count, relation_count, tenant_id), ) await db.commit() logger.info(f"Analyse {analysis_id} finalisiert: {entity_count} Entitäten, " f"{relation_count} Beziehungen, ${usage_acc.total_cost_usd:.4f}") await _broadcast(ws_manager, "network_complete", { "analysis_id": analysis_id, "entity_count": entity_count, "relation_count": relation_count, "cost_usd": round(usage_acc.total_cost_usd, 4), }) # --------------------------------------------------------------------------- # Hauptfunktion # --------------------------------------------------------------------------- async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches) Phase 2a: Entity-Dedup nach name_normalized (Code, kein API) Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung Phase 2c: Semantische Dedup via Opus (Cluster-weise) Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations) Phase 3: Finalisierung (Zähler, Hash, Log) """ from database import get_db db = await get_db() usage_acc = UsageAccumulator() try: if not await _check_analysis_exists(db, analysis_id): logger.warning(f"Analyse {analysis_id} existiert nicht") return await db.execute( "UPDATE network_analyses SET status = 'generating' WHERE id = ?", (analysis_id,), ) await db.commit() # Incident-IDs laden cursor = await db.execute( "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", (analysis_id,), ) incident_ids = [row["incident_id"] for row in await cursor.fetchall()] if not incident_ids: logger.warning(f"Analyse {analysis_id}: Keine Lagen verknüpft") await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) await db.commit() await _broadcast(ws_manager, "network_error", { "analysis_id": analysis_id, "error": "Keine Lagen verknüpft", }) return # Artikel laden placeholders = ",".join("?" * len(incident_ids)) cursor = await db.execute( f"""SELECT id, incident_id, headline, headline_de, source, source_url, content_original, content_de, collected_at FROM articles WHERE incident_id IN ({placeholders})""", incident_ids, ) article_rows = await cursor.fetchall() articles = [] article_ids = [] article_ts = [] for r in article_rows: articles.append({ "id": r["id"], "incident_id": r["incident_id"], "headline": r["headline"], "headline_de": r["headline_de"], "source": r["source"], "source_url": r["source_url"], "content_original": r["content_original"], "content_de": r["content_de"], }) article_ids.append(r["id"]) article_ts.append(r["collected_at"] or "") # Faktenchecks laden cursor = await db.execute( f"""SELECT id, incident_id, claim, status, evidence, checked_at FROM fact_checks WHERE incident_id IN ({placeholders})""", incident_ids, ) fc_rows = await cursor.fetchall() factchecks = [] factcheck_ids = [] factcheck_ts = [] for r in fc_rows: factchecks.append({ "id": r["id"], "incident_id": r["incident_id"], "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], }) factcheck_ids.append(r["id"]) factcheck_ts.append(r["checked_at"] or "") logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") # Phase 1: Entity-Extraktion if not await _check_analysis_exists(db, analysis_id): return entities = await _phase1_extract_entities( db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, ) # Phase 2a: Entity-Deduplication if not await _check_analysis_exists(db, analysis_id): return await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager) # Phase 2: Beziehungsextraktion if not await _check_analysis_exists(db, analysis_id): return relations = await _phase2_analyze_relationships( db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, ) # Phase 2c: Semantische Deduplication if not await _check_analysis_exists(db, analysis_id): return await _phase2c_semantic_dedup( db, analysis_id, tenant_id, entities, usage_acc, ws_manager, ) # Phase 2d: Cleanup if not await _check_analysis_exists(db, analysis_id): return await _phase2d_cleanup(db, analysis_id, entities, ws_manager) # Finale Zähler aus DB (nach allen Cleanups) cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", (analysis_id,), ) row = await cursor.fetchone() final_entity_count = row["cnt"] if row else len(entities) cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", (analysis_id,), ) row = await cursor.fetchone() final_relation_count = row["cnt"] if row else len(relations) # Phase 3: Finalisierung if not await _check_analysis_exists(db, analysis_id): return await _phase3_finalize( db, analysis_id, tenant_id, entity_count=final_entity_count, relation_count=final_relation_count, article_ids=article_ids, factcheck_ids=factcheck_ids, article_ts=article_ts, factcheck_ts=factcheck_ts, usage_acc=usage_acc, ws_manager=ws_manager, ) except Exception as e: logger.error(f"Entity-Extraktion fehlgeschlagen (Analyse {analysis_id}): {e}", exc_info=True) try: await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) await db.commit() except Exception: pass await _broadcast(ws_manager, "network_error", { "analysis_id": analysis_id, "error": str(e), }) finally: await db.close()