diff --git a/regenerate_relations.py b/regenerate_relations.py new file mode 100644 index 0000000..75a04a7 --- /dev/null +++ b/regenerate_relations.py @@ -0,0 +1,234 @@ +"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse. +Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus. +""" +import asyncio +import json +import sys +import os + +sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") + +from database import get_db +from agents.entity_extractor import ( + _phase2a_deduplicate_entities, + _phase2_analyze_relationships, + _phase2c_semantic_dedup, + _phase2d_cleanup, + _build_entity_name_map, + _compute_data_hash, + _broadcast, + logger, +) +from agents.claude_client import UsageAccumulator +from config import TIMEZONE +from datetime import datetime +import logging + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", +) + + +async def regenerate_relations_only(analysis_id: int): + """Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus.""" + db = await get_db() + usage_acc = UsageAccumulator() + + try: + # Analyse prüfen + cursor = await db.execute( + "SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?", + (analysis_id,), + ) + analysis = await cursor.fetchone() + if not analysis: + print(f"Analyse {analysis_id} nicht gefunden!") + return + + tenant_id = analysis["tenant_id"] + print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})") + print(f"Vorhandene Entitäten: {analysis['entity_count']}") + + # Status auf generating setzen + await db.execute( + "UPDATE network_analyses SET status = 'generating' WHERE id = ?", + (analysis_id,), + ) + await db.commit() + + # Entitäten aus DB laden (mit db_id!) + cursor = await db.execute( + """SELECT id, name, name_normalized, entity_type, description, aliases, mention_count + FROM network_entities WHERE network_analysis_id = ?""", + (analysis_id,), + ) + entity_rows = await cursor.fetchall() + entities = [] + for r in entity_rows: + aliases = [] + try: + aliases = json.loads(r["aliases"]) if r["aliases"] else [] + except (json.JSONDecodeError, TypeError): + pass + entities.append({ + "name": r["name"], + "name_normalized": r["name_normalized"], + "type": r["entity_type"], + "description": r["description"] or "", + "aliases": aliases, + "mention_count": r["mention_count"] or 1, + "db_id": r["id"], + }) + + print(f"Geladene Entitäten: {len(entities)}") + + # Phase 2a: Entity-Deduplication (vor Relation-Löschung) + print(f"\n--- Phase 2a: Entity-Deduplication ---\n") + await _phase2a_deduplicate_entities(db, analysis_id, entities) + print(f"Entitäten nach Dedup: {len(entities)}") + + # Alte Relations löschen + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", + (analysis_id,), + ) + old_count = (await cursor.fetchone())["cnt"] + print(f"\nLösche {old_count} alte Relations...") + await db.execute( + "DELETE FROM network_relations WHERE network_analysis_id = ?", + (analysis_id,), + ) + await db.commit() + + # Incident-IDs laden + cursor = await db.execute( + "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", + (analysis_id,), + ) + incident_ids = [row["incident_id"] for row in await cursor.fetchall()] + print(f"Verknüpfte Lagen: {len(incident_ids)}") + + # Artikel laden + placeholders = ",".join("?" * len(incident_ids)) + cursor = await db.execute( + f"""SELECT id, incident_id, headline, headline_de, source, source_url, + content_original, content_de, collected_at + FROM articles WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + article_rows = await cursor.fetchall() + articles = [] + article_ids = [] + article_ts = [] + for r in article_rows: + articles.append({ + "id": r["id"], "incident_id": r["incident_id"], + "headline": r["headline"], "headline_de": r["headline_de"], + "source": r["source"], "source_url": r["source_url"], + "content_original": r["content_original"], "content_de": r["content_de"], + }) + article_ids.append(r["id"]) + article_ts.append(r["collected_at"] or "") + + # Faktenchecks laden + cursor = await db.execute( + f"""SELECT id, incident_id, claim, status, evidence, checked_at + FROM fact_checks WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + fc_rows = await cursor.fetchall() + factchecks = [] + factcheck_ids = [] + factcheck_ts = [] + for r in fc_rows: + factchecks.append({ + "id": r["id"], "incident_id": r["incident_id"], + "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], + }) + factcheck_ids.append(r["id"]) + factcheck_ts.append(r["checked_at"] or "") + + print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}") + + # Phase 2: Beziehungsextraktion + print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n") + relations = await _phase2_analyze_relationships( + db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, + ) + + # Phase 2c: Semantische Deduplication + print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n") + await _phase2c_semantic_dedup( + db, analysis_id, tenant_id, entities, usage_acc, + ) + + # Phase 2d: Cleanup + print(f"\n--- Phase 2d: Cleanup ---\n") + await _phase2d_cleanup(db, analysis_id, entities) + + # Finale Zähler aus DB + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", + (analysis_id,), + ) + row = await cursor.fetchone() + final_entity_count = row["cnt"] if row else len(entities) + + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", + (analysis_id,), + ) + row = await cursor.fetchone() + final_relation_count = row["cnt"] if row else len(relations) + + # Finalisierung + data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) + now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") + + await db.execute( + """UPDATE network_analyses + SET entity_count = ?, relation_count = ?, status = 'ready', + last_generated_at = ?, data_hash = ? + WHERE id = ?""", + (final_entity_count, final_relation_count, now, data_hash, analysis_id), + ) + + await db.execute( + """INSERT INTO network_generation_log + (network_analysis_id, completed_at, status, input_tokens, output_tokens, + cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, + entity_count, relation_count, tenant_id) + VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, + usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, + usage_acc.total_cost_usd, usage_acc.call_count, + final_entity_count, final_relation_count, tenant_id), + ) + + await db.commit() + + print(f"\n{'='*60}") + print(f"FERTIG!") + print(f"Entitäten: {final_entity_count}") + print(f"Beziehungen: {final_relation_count}") + print(f"API-Calls: {usage_acc.call_count}") + print(f"Kosten: ${usage_acc.total_cost_usd:.4f}") + print(f"{'='*60}") + + except Exception as e: + print(f"FEHLER: {e}") + import traceback + traceback.print_exc() + try: + await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) + await db.commit() + except Exception: + pass + finally: + await db.close() + + +if __name__ == "__main__": + analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1 + asyncio.run(regenerate_relations_only(analysis_id)) diff --git a/src/agents/entity_extractor.py b/src/agents/entity_extractor.py index 8b4a2c2..b3186ac 100644 --- a/src/agents/entity_extractor.py +++ b/src/agents/entity_extractor.py @@ -4,6 +4,7 @@ import hashlib import json import logging import re +from collections import defaultdict from datetime import datetime from typing import Optional @@ -12,6 +13,18 @@ from config import CLAUDE_MODEL_FAST, TIMEZONE logger = logging.getLogger("osint.entity_extractor") +# --------------------------------------------------------------------------- +# Konstanten +# --------------------------------------------------------------------------- + +TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1} + +_STOP_WORDS = frozenset({ + "the", "of", "and", "for", "in", "on", "at", "to", "by", + "von", "der", "die", "das", "und", "für", "des", "den", "dem", + "ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach", +}) + # --------------------------------------------------------------------------- # Prompts # --------------------------------------------------------------------------- @@ -97,26 +110,33 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: }}""" -CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler. +SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate. ENTITÄTEN: -{entities_json} +{entity_list} -AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück: -1. "name_fix": Name ist falsch geschrieben oder unvollständig -2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location) -3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken) +AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt? -WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur. +Typische Duplikate: +- Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps" +- Sprachvarianten: "European Union" = "Europäische Union" +- Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus" + +REGELN: +- NUR echte Duplikate zusammenführen (gleiche reale Entität) +- NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen +- "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name) +- "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: {{ - "corrections": [ - {{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}}, - {{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}}, - {{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}} + "merges": [ + {{"keep": 1, "merge": [3, 5]}}, + {{"keep": 2, "merge": [4]}} ] -}}""" +}} + +Falls KEINE Duplikate: {{"merges": []}}""" # --------------------------------------------------------------------------- @@ -174,6 +194,74 @@ def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() +# --------------------------------------------------------------------------- +# Entity-Merge Helper +# --------------------------------------------------------------------------- + +async def _merge_entity_in_db( + db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict], +): + """Führt merge_ent in keep_ent zusammen (DB + In-Memory).""" + keep_id = keep_ent["db_id"] + merge_id = merge_ent["db_id"] + + # Aliases vereinen + aliases = set(keep_ent.get("aliases", [])) + aliases.add(merge_ent["name"]) + for a in merge_ent.get("aliases", []): + if a and a.strip(): + aliases.add(a.strip()) + aliases.discard(keep_ent["name"]) + keep_ent["aliases"] = list(aliases) + + # Mention count addieren + keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) + + # Längere Description behalten + if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")): + keep_ent["description"] = merge_ent["description"] + + # Entity-Mentions umhängen + await db.execute( + "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", + (keep_id, merge_id), + ) + + # Relations umhängen + await db.execute( + "UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?", + (keep_id, merge_id, analysis_id), + ) + await db.execute( + "UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?", + (keep_id, merge_id, analysis_id), + ) + + # Self-Loops entfernen die durch den Merge entstanden sein könnten + await db.execute( + "DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?", + (keep_id, keep_id, analysis_id), + ) + + # Keep-Entity in DB aktualisieren + await db.execute( + """UPDATE network_entities + SET aliases = ?, mention_count = ?, description = ? + WHERE id = ?""", + (json.dumps(keep_ent["aliases"], ensure_ascii=False), + keep_ent["mention_count"], keep_ent.get("description", ""), keep_id), + ) + + # Merge-Entity löschen + await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,)) + + # Aus entities-Liste entfernen + try: + entities.remove(merge_ent) + except ValueError: + pass + + # --------------------------------------------------------------------------- # Phase 1: Entity-Extraktion (Haiku) # --------------------------------------------------------------------------- @@ -209,7 +297,7 @@ async def _phase1_extract_entities( batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") - entity_map: dict[tuple[str, str], dict] = {} + entity_map: dict[str, dict] = {} for batch_idx, batch in enumerate(batches): articles_text = "\n\n---\n\n".join(batch) @@ -244,7 +332,7 @@ async def _phase1_extract_entities( if entity_type not in valid_types: entity_type = "organisation" - key = (name_normalized, entity_type) + key = name_normalized if key in entity_map: existing = entity_map[key] @@ -259,6 +347,9 @@ async def _phase1_extract_entities( new_desc = ent.get("description", "") if len(new_desc) > len(existing.get("description", "")): existing["description"] = new_desc + # Typ-Priorität: höherwertigen Typ behalten + if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0): + existing["type"] = entity_type else: entity_map[key] = { "name": name, @@ -303,6 +394,55 @@ async def _phase1_extract_entities( return all_entities +# --------------------------------------------------------------------------- +# Phase 2a: Entity-Deduplication nach name_normalized +# --------------------------------------------------------------------------- + +async def _phase2a_deduplicate_entities( + db, analysis_id: int, entities: list[dict], ws_manager=None, +) -> None: + """Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ).""" + logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate") + + groups = defaultdict(list) + for ent in entities: + if ent.get("db_id"): + groups[ent["name_normalized"]].append(ent) + + merge_count = 0 + merged_ids = set() + + for nn, group in groups.items(): + if len(group) < 2: + continue + # Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen + group.sort( + key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)), + reverse=True, + ) + keep = group[0] + for merge in group[1:]: + if merge["db_id"] in merged_ids: + continue + await _merge_entity_in_db(db, analysis_id, keep, merge, entities) + merged_ids.add(merge["db_id"]) + merge_count += 1 + + if merge_count > 0: + await db.commit() + + logger.info( + f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, " + f"{len(entities)} Entitäten verbleiben" + ) + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "entity_dedup", + "progress": 100, + }) + + # --------------------------------------------------------------------------- # Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch) # --------------------------------------------------------------------------- @@ -566,186 +706,232 @@ async def _phase2_analyze_relationships( # --------------------------------------------------------------------------- -# Phase 2b: Korrekturen (Opus) +# Phase 2c: Semantische Deduplication (Opus) # --------------------------------------------------------------------------- -async def _phase2b_corrections( +async def _phase2c_semantic_dedup( db, analysis_id: int, tenant_id: int, - entities: list[dict], relation_count: int, - usage_acc: UsageAccumulator, ws_manager=None, + entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None, ) -> None: - """Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add).""" + """Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten.""" if len(entities) < 10: return - logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten") + logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten") await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, - "phase": "correction", + "phase": "semantic_dedup", "progress": 0, }) - entities_for_prompt = [] - for e in entities: - entities_for_prompt.append({ - "name": e["name"], - "type": e["type"], - "description": e.get("description", ""), - "aliases": e.get("aliases", []), - }) + # --- Clustering: Token-basiert + Abbreviation-Matching --- + token_to_ids = defaultdict(set) + db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")} - batch_size = 500 - entity_batches = [entities_for_prompt[i:i + batch_size] - for i in range(0, len(entities_for_prompt), batch_size)] + for ent in entities: + db_id = ent.get("db_id") + if not db_id: + continue - all_corrections = [] - for bi, eb in enumerate(entity_batches): - entities_json = json.dumps(eb, ensure_ascii=False, indent=1) - prompt = CORRECTION_PROMPT.format(entities_json=entities_json) + all_names = [ent["name_normalized"]] + for a in ent.get("aliases", []): + if a: + all_names.append(a.lower()) + + # Token-basiertes Clustering + for name in all_names: + for word in re.split(r'[\s\-_/(),.;:]+', name): + word = word.strip() + if len(word) >= 3 and word not in _STOP_WORDS: + token_to_ids[word].add(db_id) + + # Abbreviation-Matching + all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a] + for name in all_display_names: + words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2] + if len(words) >= 2: + abbr = "".join(w[0] for w in words).lower() + if len(abbr) >= 2: + token_to_ids[f"_abbr_{abbr}"].add(db_id) + # Kurze Namen als potenzielle Abkürzungen + name_clean = re.sub(r'[^a-zA-Z]', '', name).lower() + if 2 <= len(name_clean) <= 6: + token_to_ids[f"_abbr_{name_clean}"].add(db_id) + + # Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities) + seen_clusters = set() + candidate_clusters = [] + for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])): + if len(db_ids) < 2 or len(db_ids) > 40: + continue + key = frozenset(db_ids) + if key in seen_clusters: + continue + seen_clusters.add(key) + candidate_clusters.append(list(db_ids)) + + logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden") + + # --- Opus-Calls für jeden Cluster --- + merged_away = set() + total_merges = 0 + opus_calls = 0 + max_calls = 50 + + for ci, cluster_ids in enumerate(candidate_clusters): + if opus_calls >= max_calls: + logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe") + break + + # Bereits gemergte Entities filtern + active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map] + if len(active_ids) < 2: + continue + + active_ents = [db_id_map[did] for did in active_ids] + + # Prompt bauen + lines = [] + for i, ent in enumerate(active_ents, 1): + aliases_str = ", ".join((ent.get("aliases") or [])[:5]) + line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)" + if aliases_str: + line += f" [Aliases: {aliases_str}]" + lines.append(line) + + prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines)) try: result_text, usage = await call_claude(prompt, tools=None, model=None) usage_acc.add(usage) + opus_calls += 1 except Exception as e: - logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}") + logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}") continue parsed = _parse_json_response(result_text) if not parsed: continue - corrections = parsed.get("corrections", []) - if isinstance(corrections, list): - all_corrections.extend(corrections) + for merge_group in parsed.get("merges", []): + keep_idx = merge_group.get("keep") + merge_indices = merge_group.get("merge", []) + if keep_idx is None or not merge_indices: + continue + if not isinstance(merge_indices, list): + merge_indices = [merge_indices] - logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen") + keep_idx -= 1 # 1-indexed → 0-indexed + if keep_idx < 0 or keep_idx >= len(active_ents): + continue + keep_ent = active_ents[keep_idx] + if keep_ent["db_id"] in merged_away: + continue - if all_corrections: - await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections) - logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet") + for mi in merge_indices: + mi -= 1 + if mi < 0 or mi >= len(active_ents) or mi == keep_idx: + continue + merge_ent = active_ents[mi] + if merge_ent["db_id"] in merged_away: + continue + + logger.info(f"Semantic Merge: '{merge_ent['name']}' → '{keep_ent['name']}'") + await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities) + merged_away.add(merge_ent["db_id"]) + db_id_map.pop(merge_ent["db_id"], None) + total_merges += 1 + + # Progress + progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100 + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "semantic_dedup", + "progress": min(progress, 100), + }) + + if total_merges > 0: + await db.commit() + + logger.info( + f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, " + f"{len(entities)} Entitäten verbleiben" + ) + + +# --------------------------------------------------------------------------- +# Phase 2d: Cleanup +# --------------------------------------------------------------------------- + +async def _phase2d_cleanup( + db, analysis_id: int, entities: list[dict], ws_manager=None, +) -> None: + """Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities.""" + logger.info("Phase 2d: Cleanup") + + # 1. Self-Loops entfernen + cursor = await db.execute( + "DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id", + (analysis_id,), + ) + self_loops = cursor.rowcount + + # 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein) + await db.execute( + """UPDATE network_relations + SET source_entity_id = target_entity_id, target_entity_id = source_entity_id + WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""", + (analysis_id,), + ) + + # 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung) + cursor = await db.execute( + """DELETE FROM network_relations + WHERE network_analysis_id = ? AND id NOT IN ( + SELECT MIN(id) FROM network_relations + WHERE network_analysis_id = ? + GROUP BY source_entity_id, target_entity_id, category + )""", + (analysis_id, analysis_id), + ) + dup_relations = cursor.rowcount + + # 4. Verwaiste Entities entfernen (keine Verbindungen) + cursor = await db.execute( + """DELETE FROM network_entities + WHERE network_analysis_id = ? AND id NOT IN ( + SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ? + UNION + SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ? + )""", + (analysis_id, analysis_id, analysis_id), + ) + orphans = cursor.rowcount + + # Entities-Liste aktualisieren + remaining_ids = set() + cursor = await db.execute( + "SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,) + ) + for row in await cursor.fetchall(): + remaining_ids.add(row["id"]) + entities[:] = [e for e in entities if e.get("db_id") in remaining_ids] + + await db.commit() + + logger.info( + f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, " + f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben" + ) await _broadcast(ws_manager, "network_status", { "analysis_id": analysis_id, - "phase": "correction", + "phase": "cleanup", "progress": 100, }) -async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections): - """Wendet Opus-Korrekturen auf Entitäten an.""" - for corr in corrections: - if not isinstance(corr, dict): - continue - corr_type = corr.get("type", "") - - try: - if corr_type == "name_fix": - entity_name = (corr.get("entity_name") or "").strip() - corrected_name = (corr.get("corrected_name") or "").strip() - if not entity_name or not corrected_name: - continue - - for ent in entities: - if ent["name"].lower() == entity_name.lower() or \ - ent["name_normalized"] == entity_name.lower(): - old_name = ent["name"] - ent["name"] = corrected_name - ent["name_normalized"] = corrected_name.lower().strip() - ent.setdefault("aliases", []) - if old_name not in ent["aliases"]: - ent["aliases"].append(old_name) - - if ent.get("db_id"): - await db.execute( - """UPDATE network_entities - SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1 - WHERE id = ?""", - (corrected_name, corrected_name.lower().strip(), - json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]), - ) - break - - elif corr_type == "merge": - keep_name = (corr.get("entity_name") or "").strip() - merge_name = (corr.get("merge_with") or "").strip() - if not keep_name or not merge_name: - continue - - keep_ent = merge_ent = None - for ent in entities: - nl = ent["name"].lower() - nn = ent["name_normalized"] - if nl == keep_name.lower() or nn == keep_name.lower(): - keep_ent = ent - elif nl == merge_name.lower() or nn == merge_name.lower(): - merge_ent = ent - - if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"): - aliases = set(keep_ent.get("aliases", [])) - aliases.add(merge_ent["name"]) - for a in merge_ent.get("aliases", []): - if a and a.strip(): - aliases.add(a.strip()) - keep_ent["aliases"] = list(aliases) - keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) - - await db.execute( - "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", - (keep_ent["db_id"], merge_ent["db_id"]), - ) - await db.execute( - "UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?", - (keep_ent["db_id"], merge_ent["db_id"], analysis_id), - ) - await db.execute( - "UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?", - (keep_ent["db_id"], merge_ent["db_id"], analysis_id), - ) - await db.execute( - """UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1 - WHERE id = ?""", - (json.dumps(keep_ent["aliases"], ensure_ascii=False), - keep_ent["mention_count"], keep_ent["db_id"]), - ) - await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],)) - entities.remove(merge_ent) - - elif corr_type == "add": - entity_name = (corr.get("entity_name") or "").strip() - entity_type = (corr.get("entity_type") or "organisation").lower().strip() - description = corr.get("description", "") - if not entity_name: - continue - - valid_types = {"person", "organisation", "location", "event", "military"} - if entity_type not in valid_types: - entity_type = "organisation" - - name_norm = entity_name.lower().strip() - if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities): - continue - - cursor = await db.execute( - """INSERT OR IGNORE INTO network_entities - (network_analysis_id, name, name_normalized, entity_type, - description, aliases, mention_count, corrected_by_opus, tenant_id) - VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""", - (analysis_id, entity_name, name_norm, entity_type, description, tenant_id), - ) - if cursor.lastrowid: - entities.append({ - "name": entity_name, "name_normalized": name_norm, - "type": entity_type, "description": description, - "aliases": [], "mention_count": 1, "db_id": cursor.lastrowid, - }) - - except Exception as e: - logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}") - - await db.commit() - - # --------------------------------------------------------------------------- # Phase 3: Finalisierung # --------------------------------------------------------------------------- @@ -799,10 +985,12 @@ async def _phase3_finalize( async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. - Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches) - Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung - Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add) - Phase 3: Finalisierung (Zähler, Hash, Log) + Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches) + Phase 2a: Entity-Dedup nach name_normalized (Code, kein API) + Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung + Phase 2c: Semantische Dedup via Opus (Cluster-weise) + Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations) + Phase 3: Finalisierung (Zähler, Hash, Log) """ from database import get_db @@ -879,7 +1067,7 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") - # Phase 1 + # Phase 1: Entity-Extraktion if not await _check_analysis_exists(db, analysis_id): return @@ -887,7 +1075,13 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, ) - # Phase 2 + # Phase 2a: Entity-Deduplication + if not await _check_analysis_exists(db, analysis_id): + return + + await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager) + + # Phase 2: Beziehungsextraktion if not await _check_analysis_exists(db, analysis_id): return @@ -895,15 +1089,21 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, ) - # Phase 2b: Korrekturen + # Phase 2c: Semantische Deduplication if not await _check_analysis_exists(db, analysis_id): return - await _phase2b_corrections( - db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager, + await _phase2c_semantic_dedup( + db, analysis_id, tenant_id, entities, usage_acc, ws_manager, ) - # Entity-Count nach Korrekturen aktualisieren + # Phase 2d: Cleanup + if not await _check_analysis_exists(db, analysis_id): + return + + await _phase2d_cleanup(db, analysis_id, entities, ws_manager) + + # Finale Zähler aus DB (nach allen Cleanups) cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", (analysis_id,), @@ -911,13 +1111,20 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag row = await cursor.fetchone() final_entity_count = row["cnt"] if row else len(entities) - # Phase 3 + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", + (analysis_id,), + ) + row = await cursor.fetchone() + final_relation_count = row["cnt"] if row else len(relations) + + # Phase 3: Finalisierung if not await _check_analysis_exists(db, analysis_id): return await _phase3_finalize( db, analysis_id, tenant_id, - entity_count=final_entity_count, relation_count=len(relations), + entity_count=final_entity_count, relation_count=final_relation_count, article_ids=article_ids, factcheck_ids=factcheck_ids, article_ts=article_ts, factcheck_ts=factcheck_ts, usage_acc=usage_acc, ws_manager=ws_manager,