refactor: Netzwerkanalyse Phase 2 auf batched Extraktion umgestellt

- Statt einem Mega-Opus-Call: Haiku extrahiert Beziehungen pro Artikel-Batch
- Stufe A: Per-Batch Extraktion mit nur den relevanten Entitäten (~20-50 statt 3.463)
- Stufe B: Globaler Merge + Deduplizierung (Richtungsnormalisierung, Gewichts-Boost)
- Phase 2b: Separater Opus-Korrekturpass (name_fix, merge, add) in 500er-Batches
- Löst das Problem: 0 Relations bei großen Analysen (3.463 Entitäten -> 1.791 Relations)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Dev
2026-03-17 22:51:17 +01:00
Ursprung 5e194d43e0
Commit 4d6d022bee

Datei anzeigen

@@ -1,4 +1,4 @@
"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Opus).""" """Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Batched)."""
import asyncio import asyncio
import hashlib import hashlib
import json import json
@@ -52,23 +52,20 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
}}""" }}"""
RELATIONSHIP_ANALYSIS_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System. RELATIONSHIP_BATCH_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System.
AUFGABE: Analysiere die Beziehungen zwischen den extrahierten Entitäten und korrigiere ggf. Fehler. AUFGABE: Analysiere die Beziehungen zwischen den Entitäten basierend auf den Artikeln.
EXTRAHIERTE ENTITÄTEN: BEKANNTE ENTITÄTEN (aus dem Gesamtdatensatz):
{entities_json} {entities_json}
QUELLMATERIAL: ARTIKEL:
{source_texts} {articles_text}
TEIL 1 — KORREKTUREN (optional): AUFGABE:
Prüfe die extrahierten Entitäten auf Fehler: Identifiziere ALLE Beziehungen zwischen den oben genannten Entitäten, die sich aus den Artikeln ergeben.
- "name_fix": Name ist falsch geschrieben oder unvollständig - Nur Beziehungen nennen, die im Artikeltext belegt sind
- "merge": Zwei Entitäten sind dieselbe -> zusammenführen - source und target: Exakt die Namen aus der Entitäten-Liste verwenden
- "add": Wichtige Entität fehlt komplett - Wenn eine Entität im Artikel vorkommt aber nicht in der Liste, verwende den Namen wie er in der Liste steht
TEIL 2 — BEZIEHUNGEN:
Identifiziere ALLE relevanten Beziehungen zwischen den Entitäten.
BEZIEHUNGS-KATEGORIEN: BEZIEHUNGS-KATEGORIEN:
- "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft - "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft
@@ -80,17 +77,11 @@ BEZIEHUNGS-KATEGORIEN:
REGELN: REGELN:
- weight: 1 (schwach/indirekt) bis 5 (stark/direkt) - weight: 1 (schwach/indirekt) bis 5 (stark/direkt)
- evidence[]: Stichpunkte aus dem Quellmaterial die die Beziehung belegen - evidence[]: 1-2 kurze Stichpunkte aus dem Artikeltext als Beleg
- status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd) - status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd)
- source und target: Exakt die Namen aus der Entitäten-Liste verwenden
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
{{ {{
"corrections": [
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
],
"relations": [ "relations": [
{{ {{
"source": "Entität A", "source": "Entität A",
@@ -106,6 +97,28 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
}}""" }}"""
CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler.
ENTITÄTEN:
{entities_json}
AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück:
1. "name_fix": Name ist falsch geschrieben oder unvollständig
2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location)
3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken)
WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur.
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
{{
"corrections": [
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
]
}}"""
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Hilfsfunktionen # Hilfsfunktionen
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -123,19 +136,16 @@ def _parse_json_response(text: str) -> Optional[dict]:
"""Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences.""" """Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences."""
if not text: if not text:
return None return None
# Direkt
try: try:
return json.loads(text) return json.loads(text)
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
# Markdown-Fences
fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL) fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL)
if fence_match: if fence_match:
try: try:
return json.loads(fence_match.group(1)) return json.loads(fence_match.group(1))
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
# Erstes JSON-Objekt
obj_match = re.search(r'\{.*\}', text, re.DOTALL) obj_match = re.search(r'\{.*\}', text, re.DOTALL)
if obj_match: if obj_match:
try: try:
@@ -181,7 +191,6 @@ async def _phase1_extract_entities(
headline = art.get("headline_de") or art.get("headline") or "" headline = art.get("headline_de") or art.get("headline") or ""
content = art.get("content_de") or art.get("content_original") or "" content = art.get("content_de") or art.get("content_original") or ""
source = art.get("source") or "" source = art.get("source") or ""
# Inhalt kürzen falls sehr lang
if len(content) > 2000: if len(content) > 2000:
content = content[:2000] + "..." content = content[:2000] + "..."
all_texts.append(f"[{source}] {headline}\n{content}") all_texts.append(f"[{source}] {headline}\n{content}")
@@ -196,7 +205,6 @@ async def _phase1_extract_entities(
logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden") logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden")
return [] return []
# Batching
batch_size = 30 batch_size = 30
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
@@ -269,7 +277,6 @@ async def _phase1_extract_entities(
"progress": int((batch_idx + 1) / len(batches) * 100), "progress": int((batch_idx + 1) / len(batches) * 100),
}) })
# In DB speichern
all_entities = list(entity_map.values()) all_entities = list(entity_map.values())
for ent in all_entities: for ent in all_entities:
@@ -297,19 +304,49 @@ async def _phase1_extract_entities(
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Phase 2: Beziehungsanalyse (Opus) # Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _build_entity_name_map(entities: list[dict]) -> dict[str, int]:
"""Mapping: normalisierter Name/Alias -> DB-ID."""
name_to_id: dict[str, int] = {}
for ent in entities:
db_id = ent.get("db_id")
if not db_id:
continue
name_to_id[ent["name"].lower()] = db_id
name_to_id[ent["name_normalized"]] = db_id
for alias in ent.get("aliases", []):
if alias and alias.strip():
name_to_id[alias.strip().lower()] = db_id
return name_to_id
def _find_relevant_entities(batch_texts: list[str], entities: list[dict]) -> list[dict]:
"""Findet Entitäten, die in den Batch-Texten vorkommen."""
combined_text = " ".join(batch_texts).lower()
relevant = []
for ent in entities:
if ent["name"].lower() in combined_text or ent["name_normalized"] in combined_text:
relevant.append(ent)
continue
for alias in ent.get("aliases", []):
if alias and alias.strip().lower() in combined_text:
relevant.append(ent)
break
return relevant
async def _phase2_analyze_relationships( async def _phase2_analyze_relationships(
db, analysis_id: int, tenant_id: int, db, analysis_id: int, tenant_id: int,
entities: list[dict], articles: list[dict], factchecks: list[dict], entities: list[dict], articles: list[dict], factchecks: list[dict],
usage_acc: UsageAccumulator, ws_manager=None, usage_acc: UsageAccumulator, ws_manager=None,
) -> list[dict]: ) -> list[dict]:
"""Analysiert Beziehungen via Opus und wendet Korrekturen an.""" """Analysiert Beziehungen batch-weise und merged die Ergebnisse."""
if not entities: if not entities:
return [] return []
logger.info(f"Phase 2: {len(entities)} Entitäten, Beziehungsanalyse") logger.info(f"Phase 2: {len(entities)} Entitäten, batched Beziehungsanalyse")
await _broadcast(ws_manager, "network_status", { await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id, "analysis_id": analysis_id,
@@ -317,95 +354,125 @@ async def _phase2_analyze_relationships(
"progress": 0, "progress": 0,
}) })
# Entitäten für Prompt # --- Texte vorbereiten (gleiche Logik wie Phase 1) ---
entities_for_prompt = [ all_texts = []
{"name": e["name"], "type": e["type"],
"description": e.get("description", ""), "aliases": e.get("aliases", [])}
for e in entities
]
entities_json = json.dumps(entities_for_prompt, ensure_ascii=False, indent=2)
# Quelltexte
source_parts = []
for art in articles: for art in articles:
headline = art.get("headline_de") or art.get("headline") or "" headline = art.get("headline_de") or art.get("headline") or ""
content = art.get("content_de") or art.get("content_original") or "" content = art.get("content_de") or art.get("content_original") or ""
source = art.get("source") or "" source = art.get("source") or ""
source_parts.append(f"[{source}] {headline}\n{content}") if len(content) > 2000:
content = content[:2000] + "..."
all_texts.append(f"[{source}] {headline}\n{content}")
for fc in factchecks: for fc in factchecks:
claim = fc.get("claim") or "" claim = fc.get("claim") or ""
evidence = fc.get("evidence") or "" evidence = fc.get("evidence") or ""
source_parts.append(f"[Faktencheck] {claim}\n{evidence}") status = fc.get("status") or ""
all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}")
source_texts = "\n\n---\n\n".join(source_parts) if not all_texts:
# Kürzen falls zu lang
if len(source_texts) > 150_000:
logger.info(f"Quelltexte zu lang ({len(source_texts)} Zeichen), kürze")
short_parts = []
for art in articles:
headline = art.get("headline_de") or art.get("headline") or ""
content = art.get("content_de") or art.get("content_original") or ""
short = content[:500] + "..." if len(content) > 500 else content
short_parts.append(f"[{art.get('source', '')}] {headline}: {short}")
for fc in factchecks:
short_parts.append(f"[Faktencheck] {fc.get('claim', '')} (Status: {fc.get('status', '')})")
source_texts = "\n\n".join(short_parts)
prompt = RELATIONSHIP_ANALYSIS_PROMPT.format(
entities_json=entities_json, source_texts=source_texts,
)
try:
result_text, usage = await call_claude(prompt, tools=None, model=None)
usage_acc.add(usage)
except Exception as e:
logger.error(f"Opus Beziehungsanalyse fehlgeschlagen: {e}")
return [] return []
parsed = _parse_json_response(result_text) # --- Stufe A: Per-Batch Beziehungsextraktion ---
if not parsed: batch_size = 30
logger.warning("Kein gültiges JSON von Opus") batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
return [] logger.info(f"Stufe A: {len(batches)} Batches für Beziehungsextraktion")
all_raw_relations: list[dict] = []
name_to_id = _build_entity_name_map(entities)
for batch_idx, batch in enumerate(batches):
relevant = _find_relevant_entities(batch, entities)
if len(relevant) < 2:
logger.debug(f"Batch {batch_idx + 1}: Weniger als 2 Entitäten, überspringe")
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "relationship_extraction",
"progress": int((batch_idx + 1) / len(batches) * 70),
})
continue
entities_for_prompt = [
{"name": e["name"], "type": e["type"]}
for e in relevant
]
entities_json = json.dumps(entities_for_prompt, ensure_ascii=False)
articles_text = "\n\n---\n\n".join(batch)
prompt = RELATIONSHIP_BATCH_PROMPT.format(
entities_json=entities_json,
articles_text=articles_text,
)
try:
result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
usage_acc.add(usage)
except Exception as e:
logger.error(f"Relationship Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}")
continue
parsed = _parse_json_response(result_text)
if not parsed:
logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON")
continue
relations = parsed.get("relations", [])
if not isinstance(relations, list):
continue
batch_count = 0
for rel in relations:
if not isinstance(rel, dict):
continue
source_name = (rel.get("source") or "").strip()
target_name = (rel.get("target") or "").strip()
if not source_name or not target_name:
continue
rel["_batch"] = batch_idx
all_raw_relations.append(rel)
batch_count += 1
logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {batch_count} Beziehungen, {len(relevant)} Entitäten")
# Korrekturen anwenden
corrections = parsed.get("corrections", [])
if corrections and isinstance(corrections, list):
await _broadcast(ws_manager, "network_status", { await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id, "analysis_id": analysis_id,
"phase": "correction", "phase": "relationship_extraction",
"progress": 0, "progress": int((batch_idx + 1) / len(batches) * 70),
}) })
await _apply_corrections(db, analysis_id, tenant_id, entities, corrections)
# Beziehungen speichern logger.info(f"Stufe A abgeschlossen: {len(all_raw_relations)} rohe Beziehungen aus {len(batches)} Batches")
relations = parsed.get("relations", [])
if not isinstance(relations, list): # --- Stufe B: Merge + Deduplizierung ---
return [] logger.info("Stufe B: Merge und Deduplizierung")
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "relationship_extraction",
"progress": 75,
})
name_to_id = _build_entity_name_map(entities)
valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"} valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"}
saved_relations = [] merged: dict[tuple[int, int, str], dict] = {}
for rel in relations:
if not isinstance(rel, dict):
continue
for rel in all_raw_relations:
source_name = (rel.get("source") or "").strip() source_name = (rel.get("source") or "").strip()
target_name = (rel.get("target") or "").strip() target_name = (rel.get("target") or "").strip()
if not source_name or not target_name:
continue
source_id = name_to_id.get(source_name.lower()) source_id = name_to_id.get(source_name.lower())
target_id = name_to_id.get(target_name.lower()) target_id = name_to_id.get(target_name.lower())
if not source_id or not target_id or source_id == target_id: if not source_id or not target_id or source_id == target_id:
continue continue
# Normalisiere Richtung um A->B und B->A zu mergen
if source_id > target_id:
source_id, target_id = target_id, source_id
source_name, target_name = target_name, source_name
category = (rel.get("category") or "neutral").lower().strip() category = (rel.get("category") or "neutral").lower().strip()
if category not in valid_categories: if category not in valid_categories:
category = "neutral" category = "neutral"
key = (source_id, target_id, category)
weight = rel.get("weight", 3) weight = rel.get("weight", 3)
try: try:
weight = max(1, min(5, int(weight))) weight = max(1, min(5, int(weight)))
@@ -420,6 +487,55 @@ async def _phase2_analyze_relationships(
if not isinstance(evidence, list): if not isinstance(evidence, list):
evidence = [] evidence = []
if key in merged:
existing = merged[key]
existing["weight"] = max(existing["weight"], weight)
existing_evidence = set(existing["evidence"])
for ev in evidence:
if isinstance(ev, str) and ev.strip() and ev.strip() not in existing_evidence:
existing["evidence"].append(ev.strip())
existing_evidence.add(ev.strip())
if len(existing["evidence"]) > 10:
existing["evidence"] = existing["evidence"][:10]
status_priority = {"active": 3, "emerging": 2, "historical": 1}
if status_priority.get(status, 0) > status_priority.get(existing["status"], 0):
existing["status"] = status
new_desc = rel.get("description", "")
if len(new_desc) > len(existing.get("description", "")):
existing["description"] = new_desc
existing["label"] = rel.get("label", existing["label"])
existing["_count"] = existing.get("_count", 1) + 1
else:
merged[key] = {
"source_id": source_id,
"target_id": target_id,
"source_name": source_name,
"target_name": target_name,
"category": category,
"label": rel.get("label", ""),
"description": rel.get("description", ""),
"weight": weight,
"status": status,
"evidence": [ev.strip() for ev in evidence if isinstance(ev, str) and ev.strip()][:10],
"_count": 1,
}
logger.info(f"Stufe B abgeschlossen: {len(all_raw_relations)} roh -> {len(merged)} gemerged")
# Gewichts-Boost für mehrfach belegte Beziehungen
for m in merged.values():
if m["_count"] >= 3 and m["weight"] < 5:
m["weight"] = min(5, m["weight"] + 1)
# --- In DB speichern ---
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "relationship_extraction",
"progress": 85,
})
saved_relations = []
for m in merged.values():
try: try:
cursor = await db.execute( cursor = await db.execute(
"""INSERT INTO network_relations """INSERT INTO network_relations
@@ -427,10 +543,9 @@ async def _phase2_analyze_relationships(
category, label, description, weight, status, evidence, tenant_id) category, label, description, weight, status, evidence, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
( (
analysis_id, source_id, target_id, category, analysis_id, m["source_id"], m["target_id"], m["category"],
rel.get("label", ""), rel.get("description", ""), m["label"], m["description"], m["weight"], m["status"],
weight, status, json.dumps(m["evidence"], ensure_ascii=False),
json.dumps(evidence, ensure_ascii=False),
tenant_id, tenant_id,
), ),
) )
@@ -439,7 +554,7 @@ async def _phase2_analyze_relationships(
logger.warning(f"Beziehung speichern fehlgeschlagen: {e}") logger.warning(f"Beziehung speichern fehlgeschlagen: {e}")
await db.commit() await db.commit()
logger.info(f"Phase 2 abgeschlossen: {len(corrections)} Korrekturen, {len(saved_relations)} Beziehungen") logger.info(f"Phase 2 abgeschlossen: {len(saved_relations)} Beziehungen gespeichert")
await _broadcast(ws_manager, "network_status", { await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id, "analysis_id": analysis_id,
@@ -450,19 +565,71 @@ async def _phase2_analyze_relationships(
return saved_relations return saved_relations
def _build_entity_name_map(entities: list[dict]) -> dict[str, int]: # ---------------------------------------------------------------------------
"""Mapping: normalisierter Name/Alias -> DB-ID.""" # Phase 2b: Korrekturen (Opus)
name_to_id: dict[str, int] = {} # ---------------------------------------------------------------------------
for ent in entities:
db_id = ent.get("db_id") async def _phase2b_corrections(
if not db_id: db, analysis_id: int, tenant_id: int,
entities: list[dict], relation_count: int,
usage_acc: UsageAccumulator, ws_manager=None,
) -> None:
"""Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add)."""
if len(entities) < 10:
return
logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten")
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "correction",
"progress": 0,
})
entities_for_prompt = []
for e in entities:
entities_for_prompt.append({
"name": e["name"],
"type": e["type"],
"description": e.get("description", ""),
"aliases": e.get("aliases", []),
})
batch_size = 500
entity_batches = [entities_for_prompt[i:i + batch_size]
for i in range(0, len(entities_for_prompt), batch_size)]
all_corrections = []
for bi, eb in enumerate(entity_batches):
entities_json = json.dumps(eb, ensure_ascii=False, indent=1)
prompt = CORRECTION_PROMPT.format(entities_json=entities_json)
try:
result_text, usage = await call_claude(prompt, tools=None, model=None)
usage_acc.add(usage)
except Exception as e:
logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}")
continue continue
name_to_id[ent["name"].lower()] = db_id
name_to_id[ent["name_normalized"]] = db_id parsed = _parse_json_response(result_text)
for alias in ent.get("aliases", []): if not parsed:
if alias and alias.strip(): continue
name_to_id[alias.strip().lower()] = db_id
return name_to_id corrections = parsed.get("corrections", [])
if isinstance(corrections, list):
all_corrections.extend(corrections)
logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen")
if all_corrections:
await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections)
logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet")
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "correction",
"progress": 100,
})
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections): async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
@@ -523,11 +690,18 @@ async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
keep_ent["aliases"] = list(aliases) keep_ent["aliases"] = list(aliases)
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
# Mentions übertragen
await db.execute( await db.execute(
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
(keep_ent["db_id"], merge_ent["db_id"]), (keep_ent["db_id"], merge_ent["db_id"]),
) )
await db.execute(
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute(
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute( await db.execute(
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1 """UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
WHERE id = ?""", WHERE id = ?""",
@@ -625,8 +799,9 @@ async def _phase3_finalize(
async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None):
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
Phase 1: Haiku extrahiert Entitäten aus Artikeln Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
Phase 2: Opus analysiert Beziehungen und korrigiert Haiku-Fehler Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add)
Phase 3: Finalisierung (Zähler, Hash, Log) Phase 3: Finalisierung (Zähler, Hash, Log)
""" """
from database import get_db from database import get_db
@@ -720,13 +895,29 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
) )
# Phase 2b: Korrekturen
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2b_corrections(
db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager,
)
# Entity-Count nach Korrekturen aktualisieren
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_entity_count = row["cnt"] if row else len(entities)
# Phase 3 # Phase 3
if not await _check_analysis_exists(db, analysis_id): if not await _check_analysis_exists(db, analysis_id):
return return
await _phase3_finalize( await _phase3_finalize(
db, analysis_id, tenant_id, db, analysis_id, tenant_id,
entity_count=len(entities), relation_count=len(relations), entity_count=final_entity_count, relation_count=len(relations),
article_ids=article_ids, factcheck_ids=factcheck_ids, article_ids=article_ids, factcheck_ids=factcheck_ids,
article_ts=article_ts, factcheck_ts=factcheck_ts, article_ts=article_ts, factcheck_ts=factcheck_ts,
usage_acc=usage_acc, ws_manager=ws_manager, usage_acc=usage_acc, ws_manager=ws_manager,