feat: Netzwerkanalyse Qualitätsverbesserung — 3 neue Cleanup-Stufen

Phase 1: Entity-Map Key nur noch name_normalized (statt name+type), Typ-Priorität bei Konflikten
Phase 2a (neu): Code-basierte Dedup nach name_normalized, merged Typ-Duplikate
Phase 2c (neu): Semantische Dedup via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten
Phase 2d (neu): Cleanup — Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities
Gemeinsamer _merge_entity_in_db Helper für konsistente Entity-Zusammenführung
Phase 2b (Opus-Korrekturpass) entfernt, ersetzt durch präzisere Phase 2c

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Dev
2026-03-17 23:53:28 +01:00
Ursprung 7bfa1d29cf
Commit acb3c6a6cb
2 geänderte Dateien mit 612 neuen und 171 gelöschten Zeilen

234
regenerate_relations.py Normale Datei
Datei anzeigen

@@ -0,0 +1,234 @@
"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse.
Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus.
"""
import asyncio
import json
import sys
import os
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from database import get_db
from agents.entity_extractor import (
_phase2a_deduplicate_entities,
_phase2_analyze_relationships,
_phase2c_semantic_dedup,
_phase2d_cleanup,
_build_entity_name_map,
_compute_data_hash,
_broadcast,
logger,
)
from agents.claude_client import UsageAccumulator
from config import TIMEZONE
from datetime import datetime
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
async def regenerate_relations_only(analysis_id: int):
"""Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus."""
db = await get_db()
usage_acc = UsageAccumulator()
try:
# Analyse prüfen
cursor = await db.execute(
"SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?",
(analysis_id,),
)
analysis = await cursor.fetchone()
if not analysis:
print(f"Analyse {analysis_id} nicht gefunden!")
return
tenant_id = analysis["tenant_id"]
print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})")
print(f"Vorhandene Entitäten: {analysis['entity_count']}")
# Status auf generating setzen
await db.execute(
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
(analysis_id,),
)
await db.commit()
# Entitäten aus DB laden (mit db_id!)
cursor = await db.execute(
"""SELECT id, name, name_normalized, entity_type, description, aliases, mention_count
FROM network_entities WHERE network_analysis_id = ?""",
(analysis_id,),
)
entity_rows = await cursor.fetchall()
entities = []
for r in entity_rows:
aliases = []
try:
aliases = json.loads(r["aliases"]) if r["aliases"] else []
except (json.JSONDecodeError, TypeError):
pass
entities.append({
"name": r["name"],
"name_normalized": r["name_normalized"],
"type": r["entity_type"],
"description": r["description"] or "",
"aliases": aliases,
"mention_count": r["mention_count"] or 1,
"db_id": r["id"],
})
print(f"Geladene Entitäten: {len(entities)}")
# Phase 2a: Entity-Deduplication (vor Relation-Löschung)
print(f"\n--- Phase 2a: Entity-Deduplication ---\n")
await _phase2a_deduplicate_entities(db, analysis_id, entities)
print(f"Entitäten nach Dedup: {len(entities)}")
# Alte Relations löschen
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
old_count = (await cursor.fetchone())["cnt"]
print(f"\nLösche {old_count} alte Relations...")
await db.execute(
"DELETE FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
await db.commit()
# Incident-IDs laden
cursor = await db.execute(
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
(analysis_id,),
)
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
print(f"Verknüpfte Lagen: {len(incident_ids)}")
# Artikel laden
placeholders = ",".join("?" * len(incident_ids))
cursor = await db.execute(
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
content_original, content_de, collected_at
FROM articles WHERE incident_id IN ({placeholders})""",
incident_ids,
)
article_rows = await cursor.fetchall()
articles = []
article_ids = []
article_ts = []
for r in article_rows:
articles.append({
"id": r["id"], "incident_id": r["incident_id"],
"headline": r["headline"], "headline_de": r["headline_de"],
"source": r["source"], "source_url": r["source_url"],
"content_original": r["content_original"], "content_de": r["content_de"],
})
article_ids.append(r["id"])
article_ts.append(r["collected_at"] or "")
# Faktenchecks laden
cursor = await db.execute(
f"""SELECT id, incident_id, claim, status, evidence, checked_at
FROM fact_checks WHERE incident_id IN ({placeholders})""",
incident_ids,
)
fc_rows = await cursor.fetchall()
factchecks = []
factcheck_ids = []
factcheck_ts = []
for r in fc_rows:
factchecks.append({
"id": r["id"], "incident_id": r["incident_id"],
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
})
factcheck_ids.append(r["id"])
factcheck_ts.append(r["checked_at"] or "")
print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}")
# Phase 2: Beziehungsextraktion
print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n")
relations = await _phase2_analyze_relationships(
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc,
)
# Phase 2c: Semantische Deduplication
print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n")
await _phase2c_semantic_dedup(
db, analysis_id, tenant_id, entities, usage_acc,
)
# Phase 2d: Cleanup
print(f"\n--- Phase 2d: Cleanup ---\n")
await _phase2d_cleanup(db, analysis_id, entities)
# Finale Zähler aus DB
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_entity_count = row["cnt"] if row else len(entities)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_relation_count = row["cnt"] if row else len(relations)
# Finalisierung
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
await db.execute(
"""UPDATE network_analyses
SET entity_count = ?, relation_count = ?, status = 'ready',
last_generated_at = ?, data_hash = ?
WHERE id = ?""",
(final_entity_count, final_relation_count, now, data_hash, analysis_id),
)
await db.execute(
"""INSERT INTO network_generation_log
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
entity_count, relation_count, tenant_id)
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
usage_acc.total_cost_usd, usage_acc.call_count,
final_entity_count, final_relation_count, tenant_id),
)
await db.commit()
print(f"\n{'='*60}")
print(f"FERTIG!")
print(f"Entitäten: {final_entity_count}")
print(f"Beziehungen: {final_relation_count}")
print(f"API-Calls: {usage_acc.call_count}")
print(f"Kosten: ${usage_acc.total_cost_usd:.4f}")
print(f"{'='*60}")
except Exception as e:
print(f"FEHLER: {e}")
import traceback
traceback.print_exc()
try:
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
await db.commit()
except Exception:
pass
finally:
await db.close()
if __name__ == "__main__":
analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1
asyncio.run(regenerate_relations_only(analysis_id))

Datei anzeigen

@@ -4,6 +4,7 @@ import hashlib
import json import json
import logging import logging
import re import re
from collections import defaultdict
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
@@ -12,6 +13,18 @@ from config import CLAUDE_MODEL_FAST, TIMEZONE
logger = logging.getLogger("osint.entity_extractor") logger = logging.getLogger("osint.entity_extractor")
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1}
_STOP_WORDS = frozenset({
"the", "of", "and", "for", "in", "on", "at", "to", "by",
"von", "der", "die", "das", "und", "für", "des", "den", "dem",
"ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach",
})
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Prompts # Prompts
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -97,26 +110,33 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
}}""" }}"""
CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler. SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate.
ENTITÄTEN: ENTITÄTEN:
{entities_json} {entity_list}
AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück: AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt?
1. "name_fix": Name ist falsch geschrieben oder unvollständig
2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location)
3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken)
WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur. Typische Duplikate:
- Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps"
- Sprachvarianten: "European Union" = "Europäische Union"
- Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus"
REGELN:
- NUR echte Duplikate zusammenführen (gleiche reale Entität)
- NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen
- "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name)
- "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
{{ {{
"corrections": [ "merges": [
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}}, {{"keep": 1, "merge": [3, 5]}},
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}}, {{"keep": 2, "merge": [4]}}
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
] ]
}}""" }}
Falls KEINE Duplikate: {{"merges": []}}"""
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -174,6 +194,74 @@ def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) ->
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
# ---------------------------------------------------------------------------
# Entity-Merge Helper
# ---------------------------------------------------------------------------
async def _merge_entity_in_db(
db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict],
):
"""Führt merge_ent in keep_ent zusammen (DB + In-Memory)."""
keep_id = keep_ent["db_id"]
merge_id = merge_ent["db_id"]
# Aliases vereinen
aliases = set(keep_ent.get("aliases", []))
aliases.add(merge_ent["name"])
for a in merge_ent.get("aliases", []):
if a and a.strip():
aliases.add(a.strip())
aliases.discard(keep_ent["name"])
keep_ent["aliases"] = list(aliases)
# Mention count addieren
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
# Längere Description behalten
if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")):
keep_ent["description"] = merge_ent["description"]
# Entity-Mentions umhängen
await db.execute(
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
(keep_id, merge_id),
)
# Relations umhängen
await db.execute(
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
(keep_id, merge_id, analysis_id),
)
await db.execute(
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
(keep_id, merge_id, analysis_id),
)
# Self-Loops entfernen die durch den Merge entstanden sein könnten
await db.execute(
"DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?",
(keep_id, keep_id, analysis_id),
)
# Keep-Entity in DB aktualisieren
await db.execute(
"""UPDATE network_entities
SET aliases = ?, mention_count = ?, description = ?
WHERE id = ?""",
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
keep_ent["mention_count"], keep_ent.get("description", ""), keep_id),
)
# Merge-Entity löschen
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,))
# Aus entities-Liste entfernen
try:
entities.remove(merge_ent)
except ValueError:
pass
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Phase 1: Entity-Extraktion (Haiku) # Phase 1: Entity-Extraktion (Haiku)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -209,7 +297,7 @@ async def _phase1_extract_entities(
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
entity_map: dict[tuple[str, str], dict] = {} entity_map: dict[str, dict] = {}
for batch_idx, batch in enumerate(batches): for batch_idx, batch in enumerate(batches):
articles_text = "\n\n---\n\n".join(batch) articles_text = "\n\n---\n\n".join(batch)
@@ -244,7 +332,7 @@ async def _phase1_extract_entities(
if entity_type not in valid_types: if entity_type not in valid_types:
entity_type = "organisation" entity_type = "organisation"
key = (name_normalized, entity_type) key = name_normalized
if key in entity_map: if key in entity_map:
existing = entity_map[key] existing = entity_map[key]
@@ -259,6 +347,9 @@ async def _phase1_extract_entities(
new_desc = ent.get("description", "") new_desc = ent.get("description", "")
if len(new_desc) > len(existing.get("description", "")): if len(new_desc) > len(existing.get("description", "")):
existing["description"] = new_desc existing["description"] = new_desc
# Typ-Priorität: höherwertigen Typ behalten
if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0):
existing["type"] = entity_type
else: else:
entity_map[key] = { entity_map[key] = {
"name": name, "name": name,
@@ -303,6 +394,55 @@ async def _phase1_extract_entities(
return all_entities return all_entities
# ---------------------------------------------------------------------------
# Phase 2a: Entity-Deduplication nach name_normalized
# ---------------------------------------------------------------------------
async def _phase2a_deduplicate_entities(
db, analysis_id: int, entities: list[dict], ws_manager=None,
) -> None:
"""Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ)."""
logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate")
groups = defaultdict(list)
for ent in entities:
if ent.get("db_id"):
groups[ent["name_normalized"]].append(ent)
merge_count = 0
merged_ids = set()
for nn, group in groups.items():
if len(group) < 2:
continue
# Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen
group.sort(
key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)),
reverse=True,
)
keep = group[0]
for merge in group[1:]:
if merge["db_id"] in merged_ids:
continue
await _merge_entity_in_db(db, analysis_id, keep, merge, entities)
merged_ids.add(merge["db_id"])
merge_count += 1
if merge_count > 0:
await db.commit()
logger.info(
f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, "
f"{len(entities)} Entitäten verbleiben"
)
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "entity_dedup",
"progress": 100,
})
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch) # Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -566,186 +706,232 @@ async def _phase2_analyze_relationships(
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Phase 2b: Korrekturen (Opus) # Phase 2c: Semantische Deduplication (Opus)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _phase2b_corrections( async def _phase2c_semantic_dedup(
db, analysis_id: int, tenant_id: int, db, analysis_id: int, tenant_id: int,
entities: list[dict], relation_count: int, entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None,
usage_acc: UsageAccumulator, ws_manager=None,
) -> None: ) -> None:
"""Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add).""" """Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten."""
if len(entities) < 10: if len(entities) < 10:
return return
logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten") logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten")
await _broadcast(ws_manager, "network_status", { await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id, "analysis_id": analysis_id,
"phase": "correction", "phase": "semantic_dedup",
"progress": 0, "progress": 0,
}) })
entities_for_prompt = [] # --- Clustering: Token-basiert + Abbreviation-Matching ---
for e in entities: token_to_ids = defaultdict(set)
entities_for_prompt.append({ db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")}
"name": e["name"],
"type": e["type"],
"description": e.get("description", ""),
"aliases": e.get("aliases", []),
})
batch_size = 500 for ent in entities:
entity_batches = [entities_for_prompt[i:i + batch_size] db_id = ent.get("db_id")
for i in range(0, len(entities_for_prompt), batch_size)] if not db_id:
continue
all_corrections = [] all_names = [ent["name_normalized"]]
for bi, eb in enumerate(entity_batches): for a in ent.get("aliases", []):
entities_json = json.dumps(eb, ensure_ascii=False, indent=1) if a:
prompt = CORRECTION_PROMPT.format(entities_json=entities_json) all_names.append(a.lower())
# Token-basiertes Clustering
for name in all_names:
for word in re.split(r'[\s\-_/(),.;:]+', name):
word = word.strip()
if len(word) >= 3 and word not in _STOP_WORDS:
token_to_ids[word].add(db_id)
# Abbreviation-Matching
all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a]
for name in all_display_names:
words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2]
if len(words) >= 2:
abbr = "".join(w[0] for w in words).lower()
if len(abbr) >= 2:
token_to_ids[f"_abbr_{abbr}"].add(db_id)
# Kurze Namen als potenzielle Abkürzungen
name_clean = re.sub(r'[^a-zA-Z]', '', name).lower()
if 2 <= len(name_clean) <= 6:
token_to_ids[f"_abbr_{name_clean}"].add(db_id)
# Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities)
seen_clusters = set()
candidate_clusters = []
for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])):
if len(db_ids) < 2 or len(db_ids) > 40:
continue
key = frozenset(db_ids)
if key in seen_clusters:
continue
seen_clusters.add(key)
candidate_clusters.append(list(db_ids))
logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden")
# --- Opus-Calls für jeden Cluster ---
merged_away = set()
total_merges = 0
opus_calls = 0
max_calls = 50
for ci, cluster_ids in enumerate(candidate_clusters):
if opus_calls >= max_calls:
logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe")
break
# Bereits gemergte Entities filtern
active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map]
if len(active_ids) < 2:
continue
active_ents = [db_id_map[did] for did in active_ids]
# Prompt bauen
lines = []
for i, ent in enumerate(active_ents, 1):
aliases_str = ", ".join((ent.get("aliases") or [])[:5])
line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)"
if aliases_str:
line += f" [Aliases: {aliases_str}]"
lines.append(line)
prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines))
try: try:
result_text, usage = await call_claude(prompt, tools=None, model=None) result_text, usage = await call_claude(prompt, tools=None, model=None)
usage_acc.add(usage) usage_acc.add(usage)
opus_calls += 1
except Exception as e: except Exception as e:
logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}") logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}")
continue continue
parsed = _parse_json_response(result_text) parsed = _parse_json_response(result_text)
if not parsed: if not parsed:
continue continue
corrections = parsed.get("corrections", []) for merge_group in parsed.get("merges", []):
if isinstance(corrections, list): keep_idx = merge_group.get("keep")
all_corrections.extend(corrections) merge_indices = merge_group.get("merge", [])
if keep_idx is None or not merge_indices:
continue
if not isinstance(merge_indices, list):
merge_indices = [merge_indices]
logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen") keep_idx -= 1 # 1-indexed → 0-indexed
if keep_idx < 0 or keep_idx >= len(active_ents):
continue
keep_ent = active_ents[keep_idx]
if keep_ent["db_id"] in merged_away:
continue
if all_corrections: for mi in merge_indices:
await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections) mi -= 1
logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet") if mi < 0 or mi >= len(active_ents) or mi == keep_idx:
continue
merge_ent = active_ents[mi]
if merge_ent["db_id"] in merged_away:
continue
logger.info(f"Semantic Merge: '{merge_ent['name']}''{keep_ent['name']}'")
await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities)
merged_away.add(merge_ent["db_id"])
db_id_map.pop(merge_ent["db_id"], None)
total_merges += 1
# Progress
progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "semantic_dedup",
"progress": min(progress, 100),
})
if total_merges > 0:
await db.commit()
logger.info(
f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, "
f"{len(entities)} Entitäten verbleiben"
)
# ---------------------------------------------------------------------------
# Phase 2d: Cleanup
# ---------------------------------------------------------------------------
async def _phase2d_cleanup(
db, analysis_id: int, entities: list[dict], ws_manager=None,
) -> None:
"""Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities."""
logger.info("Phase 2d: Cleanup")
# 1. Self-Loops entfernen
cursor = await db.execute(
"DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id",
(analysis_id,),
)
self_loops = cursor.rowcount
# 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein)
await db.execute(
"""UPDATE network_relations
SET source_entity_id = target_entity_id, target_entity_id = source_entity_id
WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""",
(analysis_id,),
)
# 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung)
cursor = await db.execute(
"""DELETE FROM network_relations
WHERE network_analysis_id = ? AND id NOT IN (
SELECT MIN(id) FROM network_relations
WHERE network_analysis_id = ?
GROUP BY source_entity_id, target_entity_id, category
)""",
(analysis_id, analysis_id),
)
dup_relations = cursor.rowcount
# 4. Verwaiste Entities entfernen (keine Verbindungen)
cursor = await db.execute(
"""DELETE FROM network_entities
WHERE network_analysis_id = ? AND id NOT IN (
SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ?
UNION
SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ?
)""",
(analysis_id, analysis_id, analysis_id),
)
orphans = cursor.rowcount
# Entities-Liste aktualisieren
remaining_ids = set()
cursor = await db.execute(
"SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,)
)
for row in await cursor.fetchall():
remaining_ids.add(row["id"])
entities[:] = [e for e in entities if e.get("db_id") in remaining_ids]
await db.commit()
logger.info(
f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, "
f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben"
)
await _broadcast(ws_manager, "network_status", { await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id, "analysis_id": analysis_id,
"phase": "correction", "phase": "cleanup",
"progress": 100, "progress": 100,
}) })
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
"""Wendet Opus-Korrekturen auf Entitäten an."""
for corr in corrections:
if not isinstance(corr, dict):
continue
corr_type = corr.get("type", "")
try:
if corr_type == "name_fix":
entity_name = (corr.get("entity_name") or "").strip()
corrected_name = (corr.get("corrected_name") or "").strip()
if not entity_name or not corrected_name:
continue
for ent in entities:
if ent["name"].lower() == entity_name.lower() or \
ent["name_normalized"] == entity_name.lower():
old_name = ent["name"]
ent["name"] = corrected_name
ent["name_normalized"] = corrected_name.lower().strip()
ent.setdefault("aliases", [])
if old_name not in ent["aliases"]:
ent["aliases"].append(old_name)
if ent.get("db_id"):
await db.execute(
"""UPDATE network_entities
SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1
WHERE id = ?""",
(corrected_name, corrected_name.lower().strip(),
json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]),
)
break
elif corr_type == "merge":
keep_name = (corr.get("entity_name") or "").strip()
merge_name = (corr.get("merge_with") or "").strip()
if not keep_name or not merge_name:
continue
keep_ent = merge_ent = None
for ent in entities:
nl = ent["name"].lower()
nn = ent["name_normalized"]
if nl == keep_name.lower() or nn == keep_name.lower():
keep_ent = ent
elif nl == merge_name.lower() or nn == merge_name.lower():
merge_ent = ent
if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"):
aliases = set(keep_ent.get("aliases", []))
aliases.add(merge_ent["name"])
for a in merge_ent.get("aliases", []):
if a and a.strip():
aliases.add(a.strip())
keep_ent["aliases"] = list(aliases)
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
await db.execute(
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
(keep_ent["db_id"], merge_ent["db_id"]),
)
await db.execute(
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute(
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute(
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
WHERE id = ?""",
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
keep_ent["mention_count"], keep_ent["db_id"]),
)
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],))
entities.remove(merge_ent)
elif corr_type == "add":
entity_name = (corr.get("entity_name") or "").strip()
entity_type = (corr.get("entity_type") or "organisation").lower().strip()
description = corr.get("description", "")
if not entity_name:
continue
valid_types = {"person", "organisation", "location", "event", "military"}
if entity_type not in valid_types:
entity_type = "organisation"
name_norm = entity_name.lower().strip()
if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities):
continue
cursor = await db.execute(
"""INSERT OR IGNORE INTO network_entities
(network_analysis_id, name, name_normalized, entity_type,
description, aliases, mention_count, corrected_by_opus, tenant_id)
VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""",
(analysis_id, entity_name, name_norm, entity_type, description, tenant_id),
)
if cursor.lastrowid:
entities.append({
"name": entity_name, "name_normalized": name_norm,
"type": entity_type, "description": description,
"aliases": [], "mention_count": 1, "db_id": cursor.lastrowid,
})
except Exception as e:
logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}")
await db.commit()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Phase 3: Finalisierung # Phase 3: Finalisierung
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -800,8 +986,10 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches) Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
Phase 2a: Entity-Dedup nach name_normalized (Code, kein API)
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add) Phase 2c: Semantische Dedup via Opus (Cluster-weise)
Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations)
Phase 3: Finalisierung (Zähler, Hash, Log) Phase 3: Finalisierung (Zähler, Hash, Log)
""" """
from database import get_db from database import get_db
@@ -879,7 +1067,7 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
# Phase 1 # Phase 1: Entity-Extraktion
if not await _check_analysis_exists(db, analysis_id): if not await _check_analysis_exists(db, analysis_id):
return return
@@ -887,7 +1075,13 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
) )
# Phase 2 # Phase 2a: Entity-Deduplication
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager)
# Phase 2: Beziehungsextraktion
if not await _check_analysis_exists(db, analysis_id): if not await _check_analysis_exists(db, analysis_id):
return return
@@ -895,15 +1089,21 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
) )
# Phase 2b: Korrekturen # Phase 2c: Semantische Deduplication
if not await _check_analysis_exists(db, analysis_id): if not await _check_analysis_exists(db, analysis_id):
return return
await _phase2b_corrections( await _phase2c_semantic_dedup(
db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager, db, analysis_id, tenant_id, entities, usage_acc, ws_manager,
) )
# Entity-Count nach Korrekturen aktualisieren # Phase 2d: Cleanup
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2d_cleanup(db, analysis_id, entities, ws_manager)
# Finale Zähler aus DB (nach allen Cleanups)
cursor = await db.execute( cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
(analysis_id,), (analysis_id,),
@@ -911,13 +1111,20 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
row = await cursor.fetchone() row = await cursor.fetchone()
final_entity_count = row["cnt"] if row else len(entities) final_entity_count = row["cnt"] if row else len(entities)
# Phase 3 cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_relation_count = row["cnt"] if row else len(relations)
# Phase 3: Finalisierung
if not await _check_analysis_exists(db, analysis_id): if not await _check_analysis_exists(db, analysis_id):
return return
await _phase3_finalize( await _phase3_finalize(
db, analysis_id, tenant_id, db, analysis_id, tenant_id,
entity_count=final_entity_count, relation_count=len(relations), entity_count=final_entity_count, relation_count=final_relation_count,
article_ids=article_ids, factcheck_ids=factcheck_ids, article_ids=article_ids, factcheck_ids=factcheck_ids,
article_ts=article_ts, factcheck_ts=factcheck_ts, article_ts=article_ts, factcheck_ts=factcheck_ts,
usage_acc=usage_acc, ws_manager=ws_manager, usage_acc=usage_acc, ws_manager=ws_manager,