feat: Netzwerkanalyse-Feature (Wissensgraph)
Neues Feature zur Visualisierung von Entitäten und Beziehungen aus ausgewählten Lagen als interaktiver d3.js-Netzwerkgraph. - Haiku extrahiert Entitäten (Person, Organisation, Ort, Ereignis, Militär) - Opus analysiert Beziehungen und korrigiert Haiku-Fehler - 6 neue DB-Tabellen (network_analyses, _entities, _relations, etc.) - REST-API: CRUD + Generierung + Export (JSON/CSV) - d3.js Force-Directed Graph mit Zoom, Filter, Suche, Export - WebSocket-Events für Live-Progress während Generierung - Sidebar-Integration mit Netzwerkanalysen-Sektion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
746
src/agents/entity_extractor.py
Normale Datei
746
src/agents/entity_extractor.py
Normale Datei
@@ -0,0 +1,746 @@
|
||||
"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Opus)."""
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator
|
||||
from config import CLAUDE_MODEL_FAST, TIMEZONE
|
||||
|
||||
logger = logging.getLogger("osint.entity_extractor")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prompts
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
ENTITY_EXTRACTION_PROMPT = """Du bist ein OSINT-Analyst für ein Lagemonitoring-System.
|
||||
AUFGABE: Extrahiere ALLE relevanten Entitäten aus den folgenden Nachrichtenartikeln.
|
||||
|
||||
ARTIKEL:
|
||||
{articles_text}
|
||||
|
||||
REGELN:
|
||||
- Extrahiere JEDE genannte Person, Organisation, Ort, Ereignis und militärische Einheit
|
||||
- Normalisiere Namen: "Wladimir Putin", "Putin", "V. Putin" -> eine Entität
|
||||
- Aliase erfassen: Alle Namensvarianten einer Entität als aliases[]
|
||||
- mention_count: Wie oft wird die Entität insgesamt in allen Artikeln erwähnt?
|
||||
- Beschreibung: Kurze Einordnung, wer/was die Entität ist (1 Satz)
|
||||
- KEINE Duplikate: Gleiche Entitäten zusammenfassen
|
||||
|
||||
ENTITY-TYPEN:
|
||||
- "person": Individuelle Personen (Politiker, Militärs, Journalisten etc.)
|
||||
- "organisation": Organisationen, Parteien, Behörden, Unternehmen, NGOs
|
||||
- "location": Länder, Städte, Regionen, Gebiete
|
||||
- "event": Konkrete Ereignisse (Wahlen, Anschläge, Konferenzen etc.)
|
||||
- "military": Militärische Einheiten, Waffensysteme, Operationen
|
||||
|
||||
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
||||
{{
|
||||
"entities": [
|
||||
{{
|
||||
"name": "Vollständiger Name",
|
||||
"name_normalized": "vollständiger name",
|
||||
"type": "person|organisation|location|event|military",
|
||||
"description": "Kurze Einordnung (1 Satz)",
|
||||
"aliases": ["Alias1", "Alias2"],
|
||||
"mention_count": 5
|
||||
}}
|
||||
]
|
||||
}}"""
|
||||
|
||||
|
||||
RELATIONSHIP_ANALYSIS_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System.
|
||||
AUFGABE: Analysiere die Beziehungen zwischen den extrahierten Entitäten und korrigiere ggf. Fehler.
|
||||
|
||||
EXTRAHIERTE ENTITÄTEN:
|
||||
{entities_json}
|
||||
|
||||
QUELLMATERIAL:
|
||||
{source_texts}
|
||||
|
||||
TEIL 1 — KORREKTUREN (optional):
|
||||
Prüfe die extrahierten Entitäten auf Fehler:
|
||||
- "name_fix": Name ist falsch geschrieben oder unvollständig
|
||||
- "merge": Zwei Entitäten sind dieselbe -> zusammenführen
|
||||
- "add": Wichtige Entität fehlt komplett
|
||||
|
||||
TEIL 2 — BEZIEHUNGEN:
|
||||
Identifiziere ALLE relevanten Beziehungen zwischen den Entitäten.
|
||||
|
||||
BEZIEHUNGS-KATEGORIEN:
|
||||
- "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft
|
||||
- "conflict": Konflikt, Krieg, Feindschaft, Sanktionen, Opposition
|
||||
- "diplomacy": Diplomatische Beziehungen, Verhandlungen, Abkommen
|
||||
- "economic": Wirtschaftsbeziehungen, Handel, Investitionen
|
||||
- "legal": Rechtliche Beziehungen, Klagen, Verurteilungen
|
||||
- "neutral": Sonstige Beziehung, Erwähnung, Verbindung
|
||||
|
||||
REGELN:
|
||||
- weight: 1 (schwach/indirekt) bis 5 (stark/direkt)
|
||||
- evidence[]: Stichpunkte aus dem Quellmaterial die die Beziehung belegen
|
||||
- status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd)
|
||||
- source und target: Exakt die Namen aus der Entitäten-Liste verwenden
|
||||
|
||||
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
||||
{{
|
||||
"corrections": [
|
||||
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
|
||||
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
|
||||
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
|
||||
],
|
||||
"relations": [
|
||||
{{
|
||||
"source": "Entität A",
|
||||
"target": "Entität B",
|
||||
"category": "alliance|conflict|diplomacy|economic|legal|neutral",
|
||||
"label": "Kurzes Label (2-4 Wörter)",
|
||||
"description": "Beschreibung (1-2 Sätze)",
|
||||
"weight": 3,
|
||||
"status": "active|historical|emerging",
|
||||
"evidence": ["Beleg 1", "Beleg 2"]
|
||||
}}
|
||||
]
|
||||
}}"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hilfsfunktionen
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _broadcast(ws_manager, msg_type: str, data: dict):
|
||||
"""Sendet eine WebSocket-Nachricht, falls ws_manager vorhanden."""
|
||||
if ws_manager:
|
||||
try:
|
||||
await ws_manager.broadcast({"type": msg_type, **data})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _parse_json_response(text: str) -> Optional[dict]:
|
||||
"""Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences."""
|
||||
if not text:
|
||||
return None
|
||||
# Direkt
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
# Markdown-Fences
|
||||
fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL)
|
||||
if fence_match:
|
||||
try:
|
||||
return json.loads(fence_match.group(1))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
# Erstes JSON-Objekt
|
||||
obj_match = re.search(r'\{.*\}', text, re.DOTALL)
|
||||
if obj_match:
|
||||
try:
|
||||
return json.loads(obj_match.group())
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
logger.warning("JSON-Parse fehlgeschlagen")
|
||||
return None
|
||||
|
||||
|
||||
async def _check_analysis_exists(db, analysis_id: int) -> bool:
|
||||
"""Prüft ob die Analyse noch existiert."""
|
||||
cursor = await db.execute(
|
||||
"SELECT id FROM network_analyses WHERE id = ?", (analysis_id,)
|
||||
)
|
||||
return await cursor.fetchone() is not None
|
||||
|
||||
|
||||
def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> str:
|
||||
"""SHA256-Hash über sortierte IDs und Timestamps."""
|
||||
parts = []
|
||||
for aid, ats in sorted(zip(article_ids, article_ts)):
|
||||
parts.append(f"a:{aid}:{ats}")
|
||||
for fid, fts in sorted(zip(factcheck_ids, factcheck_ts)):
|
||||
parts.append(f"f:{fid}:{fts}")
|
||||
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 1: Entity-Extraktion (Haiku)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase1_extract_entities(
|
||||
db, analysis_id: int, tenant_id: int,
|
||||
articles: list[dict], factchecks: list[dict],
|
||||
usage_acc: UsageAccumulator, ws_manager=None,
|
||||
) -> list[dict]:
|
||||
"""Extrahiert Entitäten aus Artikeln via Haiku in Batches."""
|
||||
logger.info(f"Phase 1: {len(articles)} Artikel, {len(factchecks)} Faktenchecks")
|
||||
|
||||
all_texts = []
|
||||
for art in articles:
|
||||
headline = art.get("headline_de") or art.get("headline") or ""
|
||||
content = art.get("content_de") or art.get("content_original") or ""
|
||||
source = art.get("source") or ""
|
||||
# Inhalt kürzen falls sehr lang
|
||||
if len(content) > 2000:
|
||||
content = content[:2000] + "..."
|
||||
all_texts.append(f"[{source}] {headline}\n{content}")
|
||||
|
||||
for fc in factchecks:
|
||||
claim = fc.get("claim") or ""
|
||||
evidence = fc.get("evidence") or ""
|
||||
status = fc.get("status") or ""
|
||||
all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}")
|
||||
|
||||
if not all_texts:
|
||||
logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden")
|
||||
return []
|
||||
|
||||
# Batching
|
||||
batch_size = 30
|
||||
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
|
||||
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
|
||||
|
||||
entity_map: dict[tuple[str, str], dict] = {}
|
||||
|
||||
for batch_idx, batch in enumerate(batches):
|
||||
articles_text = "\n\n---\n\n".join(batch)
|
||||
prompt = ENTITY_EXTRACTION_PROMPT.format(articles_text=articles_text)
|
||||
|
||||
try:
|
||||
result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
||||
usage_acc.add(usage)
|
||||
except Exception as e:
|
||||
logger.error(f"Haiku Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}")
|
||||
continue
|
||||
|
||||
parsed = _parse_json_response(result_text)
|
||||
if not parsed or "entities" not in parsed:
|
||||
logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON")
|
||||
continue
|
||||
|
||||
entities = parsed["entities"]
|
||||
if not isinstance(entities, list):
|
||||
continue
|
||||
|
||||
for ent in entities:
|
||||
if not isinstance(ent, dict):
|
||||
continue
|
||||
name = (ent.get("name") or "").strip()
|
||||
if not name:
|
||||
continue
|
||||
|
||||
name_normalized = (ent.get("name_normalized") or name.lower()).strip().lower()
|
||||
entity_type = (ent.get("type") or "organisation").lower().strip()
|
||||
valid_types = {"person", "organisation", "location", "event", "military"}
|
||||
if entity_type not in valid_types:
|
||||
entity_type = "organisation"
|
||||
|
||||
key = (name_normalized, entity_type)
|
||||
|
||||
if key in entity_map:
|
||||
existing = entity_map[key]
|
||||
aliases = set(existing.get("aliases", []))
|
||||
for alias in ent.get("aliases", []):
|
||||
if alias and alias.strip():
|
||||
aliases.add(alias.strip())
|
||||
if name != existing["name"]:
|
||||
aliases.add(name)
|
||||
existing["aliases"] = list(aliases)
|
||||
existing["mention_count"] = existing.get("mention_count", 1) + ent.get("mention_count", 1)
|
||||
new_desc = ent.get("description", "")
|
||||
if len(new_desc) > len(existing.get("description", "")):
|
||||
existing["description"] = new_desc
|
||||
else:
|
||||
entity_map[key] = {
|
||||
"name": name,
|
||||
"name_normalized": name_normalized,
|
||||
"type": entity_type,
|
||||
"description": ent.get("description", ""),
|
||||
"aliases": [a.strip() for a in ent.get("aliases", []) if a and a.strip()],
|
||||
"mention_count": ent.get("mention_count", 1),
|
||||
}
|
||||
|
||||
logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {len(entity_map)} Entitäten gesamt")
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "entity_extraction",
|
||||
"progress": int((batch_idx + 1) / len(batches) * 100),
|
||||
})
|
||||
|
||||
# In DB speichern
|
||||
all_entities = list(entity_map.values())
|
||||
|
||||
for ent in all_entities:
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"""INSERT OR IGNORE INTO network_entities
|
||||
(network_analysis_id, name, name_normalized, entity_type,
|
||||
description, aliases, mention_count, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
analysis_id, ent["name"], ent["name_normalized"], ent["type"],
|
||||
ent.get("description", ""),
|
||||
json.dumps(ent.get("aliases", []), ensure_ascii=False),
|
||||
ent.get("mention_count", 1),
|
||||
tenant_id,
|
||||
),
|
||||
)
|
||||
ent["db_id"] = cursor.lastrowid
|
||||
except Exception as e:
|
||||
logger.warning(f"Entity speichern fehlgeschlagen '{ent['name']}': {e}")
|
||||
|
||||
await db.commit()
|
||||
logger.info(f"Phase 1 abgeschlossen: {len(all_entities)} Entitäten gespeichert")
|
||||
return all_entities
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2: Beziehungsanalyse (Opus)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase2_analyze_relationships(
|
||||
db, analysis_id: int, tenant_id: int,
|
||||
entities: list[dict], articles: list[dict], factchecks: list[dict],
|
||||
usage_acc: UsageAccumulator, ws_manager=None,
|
||||
) -> list[dict]:
|
||||
"""Analysiert Beziehungen via Opus und wendet Korrekturen an."""
|
||||
if not entities:
|
||||
return []
|
||||
|
||||
logger.info(f"Phase 2: {len(entities)} Entitäten, Beziehungsanalyse")
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "relationship_extraction",
|
||||
"progress": 0,
|
||||
})
|
||||
|
||||
# Entitäten für Prompt
|
||||
entities_for_prompt = [
|
||||
{"name": e["name"], "type": e["type"],
|
||||
"description": e.get("description", ""), "aliases": e.get("aliases", [])}
|
||||
for e in entities
|
||||
]
|
||||
entities_json = json.dumps(entities_for_prompt, ensure_ascii=False, indent=2)
|
||||
|
||||
# Quelltexte
|
||||
source_parts = []
|
||||
for art in articles:
|
||||
headline = art.get("headline_de") or art.get("headline") or ""
|
||||
content = art.get("content_de") or art.get("content_original") or ""
|
||||
source = art.get("source") or ""
|
||||
source_parts.append(f"[{source}] {headline}\n{content}")
|
||||
|
||||
for fc in factchecks:
|
||||
claim = fc.get("claim") or ""
|
||||
evidence = fc.get("evidence") or ""
|
||||
source_parts.append(f"[Faktencheck] {claim}\n{evidence}")
|
||||
|
||||
source_texts = "\n\n---\n\n".join(source_parts)
|
||||
|
||||
# Kürzen falls zu lang
|
||||
if len(source_texts) > 150_000:
|
||||
logger.info(f"Quelltexte zu lang ({len(source_texts)} Zeichen), kürze")
|
||||
short_parts = []
|
||||
for art in articles:
|
||||
headline = art.get("headline_de") or art.get("headline") or ""
|
||||
content = art.get("content_de") or art.get("content_original") or ""
|
||||
short = content[:500] + "..." if len(content) > 500 else content
|
||||
short_parts.append(f"[{art.get('source', '')}] {headline}: {short}")
|
||||
for fc in factchecks:
|
||||
short_parts.append(f"[Faktencheck] {fc.get('claim', '')} (Status: {fc.get('status', '')})")
|
||||
source_texts = "\n\n".join(short_parts)
|
||||
|
||||
prompt = RELATIONSHIP_ANALYSIS_PROMPT.format(
|
||||
entities_json=entities_json, source_texts=source_texts,
|
||||
)
|
||||
|
||||
try:
|
||||
result_text, usage = await call_claude(prompt, tools=None, model=None)
|
||||
usage_acc.add(usage)
|
||||
except Exception as e:
|
||||
logger.error(f"Opus Beziehungsanalyse fehlgeschlagen: {e}")
|
||||
return []
|
||||
|
||||
parsed = _parse_json_response(result_text)
|
||||
if not parsed:
|
||||
logger.warning("Kein gültiges JSON von Opus")
|
||||
return []
|
||||
|
||||
# Korrekturen anwenden
|
||||
corrections = parsed.get("corrections", [])
|
||||
if corrections and isinstance(corrections, list):
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "correction",
|
||||
"progress": 0,
|
||||
})
|
||||
await _apply_corrections(db, analysis_id, tenant_id, entities, corrections)
|
||||
|
||||
# Beziehungen speichern
|
||||
relations = parsed.get("relations", [])
|
||||
if not isinstance(relations, list):
|
||||
return []
|
||||
|
||||
name_to_id = _build_entity_name_map(entities)
|
||||
valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"}
|
||||
saved_relations = []
|
||||
|
||||
for rel in relations:
|
||||
if not isinstance(rel, dict):
|
||||
continue
|
||||
|
||||
source_name = (rel.get("source") or "").strip()
|
||||
target_name = (rel.get("target") or "").strip()
|
||||
if not source_name or not target_name:
|
||||
continue
|
||||
|
||||
source_id = name_to_id.get(source_name.lower())
|
||||
target_id = name_to_id.get(target_name.lower())
|
||||
if not source_id or not target_id or source_id == target_id:
|
||||
continue
|
||||
|
||||
category = (rel.get("category") or "neutral").lower().strip()
|
||||
if category not in valid_categories:
|
||||
category = "neutral"
|
||||
|
||||
weight = rel.get("weight", 3)
|
||||
try:
|
||||
weight = max(1, min(5, int(weight)))
|
||||
except (ValueError, TypeError):
|
||||
weight = 3
|
||||
|
||||
status = (rel.get("status") or "active").lower().strip()
|
||||
if status not in {"active", "historical", "emerging"}:
|
||||
status = "active"
|
||||
|
||||
evidence = rel.get("evidence", [])
|
||||
if not isinstance(evidence, list):
|
||||
evidence = []
|
||||
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"""INSERT INTO network_relations
|
||||
(network_analysis_id, source_entity_id, target_entity_id,
|
||||
category, label, description, weight, status, evidence, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
analysis_id, source_id, target_id, category,
|
||||
rel.get("label", ""), rel.get("description", ""),
|
||||
weight, status,
|
||||
json.dumps(evidence, ensure_ascii=False),
|
||||
tenant_id,
|
||||
),
|
||||
)
|
||||
saved_relations.append({"id": cursor.lastrowid})
|
||||
except Exception as e:
|
||||
logger.warning(f"Beziehung speichern fehlgeschlagen: {e}")
|
||||
|
||||
await db.commit()
|
||||
logger.info(f"Phase 2 abgeschlossen: {len(corrections)} Korrekturen, {len(saved_relations)} Beziehungen")
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "relationship_extraction",
|
||||
"progress": 100,
|
||||
})
|
||||
|
||||
return saved_relations
|
||||
|
||||
|
||||
def _build_entity_name_map(entities: list[dict]) -> dict[str, int]:
|
||||
"""Mapping: normalisierter Name/Alias -> DB-ID."""
|
||||
name_to_id: dict[str, int] = {}
|
||||
for ent in entities:
|
||||
db_id = ent.get("db_id")
|
||||
if not db_id:
|
||||
continue
|
||||
name_to_id[ent["name"].lower()] = db_id
|
||||
name_to_id[ent["name_normalized"]] = db_id
|
||||
for alias in ent.get("aliases", []):
|
||||
if alias and alias.strip():
|
||||
name_to_id[alias.strip().lower()] = db_id
|
||||
return name_to_id
|
||||
|
||||
|
||||
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
|
||||
"""Wendet Opus-Korrekturen auf Entitäten an."""
|
||||
for corr in corrections:
|
||||
if not isinstance(corr, dict):
|
||||
continue
|
||||
corr_type = corr.get("type", "")
|
||||
|
||||
try:
|
||||
if corr_type == "name_fix":
|
||||
entity_name = (corr.get("entity_name") or "").strip()
|
||||
corrected_name = (corr.get("corrected_name") or "").strip()
|
||||
if not entity_name or not corrected_name:
|
||||
continue
|
||||
|
||||
for ent in entities:
|
||||
if ent["name"].lower() == entity_name.lower() or \
|
||||
ent["name_normalized"] == entity_name.lower():
|
||||
old_name = ent["name"]
|
||||
ent["name"] = corrected_name
|
||||
ent["name_normalized"] = corrected_name.lower().strip()
|
||||
ent.setdefault("aliases", [])
|
||||
if old_name not in ent["aliases"]:
|
||||
ent["aliases"].append(old_name)
|
||||
|
||||
if ent.get("db_id"):
|
||||
await db.execute(
|
||||
"""UPDATE network_entities
|
||||
SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1
|
||||
WHERE id = ?""",
|
||||
(corrected_name, corrected_name.lower().strip(),
|
||||
json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]),
|
||||
)
|
||||
break
|
||||
|
||||
elif corr_type == "merge":
|
||||
keep_name = (corr.get("entity_name") or "").strip()
|
||||
merge_name = (corr.get("merge_with") or "").strip()
|
||||
if not keep_name or not merge_name:
|
||||
continue
|
||||
|
||||
keep_ent = merge_ent = None
|
||||
for ent in entities:
|
||||
nl = ent["name"].lower()
|
||||
nn = ent["name_normalized"]
|
||||
if nl == keep_name.lower() or nn == keep_name.lower():
|
||||
keep_ent = ent
|
||||
elif nl == merge_name.lower() or nn == merge_name.lower():
|
||||
merge_ent = ent
|
||||
|
||||
if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"):
|
||||
aliases = set(keep_ent.get("aliases", []))
|
||||
aliases.add(merge_ent["name"])
|
||||
for a in merge_ent.get("aliases", []):
|
||||
if a and a.strip():
|
||||
aliases.add(a.strip())
|
||||
keep_ent["aliases"] = list(aliases)
|
||||
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
|
||||
|
||||
# Mentions übertragen
|
||||
await db.execute(
|
||||
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
||||
(keep_ent["db_id"], merge_ent["db_id"]),
|
||||
)
|
||||
await db.execute(
|
||||
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
|
||||
WHERE id = ?""",
|
||||
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
|
||||
keep_ent["mention_count"], keep_ent["db_id"]),
|
||||
)
|
||||
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],))
|
||||
entities.remove(merge_ent)
|
||||
|
||||
elif corr_type == "add":
|
||||
entity_name = (corr.get("entity_name") or "").strip()
|
||||
entity_type = (corr.get("entity_type") or "organisation").lower().strip()
|
||||
description = corr.get("description", "")
|
||||
if not entity_name:
|
||||
continue
|
||||
|
||||
valid_types = {"person", "organisation", "location", "event", "military"}
|
||||
if entity_type not in valid_types:
|
||||
entity_type = "organisation"
|
||||
|
||||
name_norm = entity_name.lower().strip()
|
||||
if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities):
|
||||
continue
|
||||
|
||||
cursor = await db.execute(
|
||||
"""INSERT OR IGNORE INTO network_entities
|
||||
(network_analysis_id, name, name_normalized, entity_type,
|
||||
description, aliases, mention_count, corrected_by_opus, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""",
|
||||
(analysis_id, entity_name, name_norm, entity_type, description, tenant_id),
|
||||
)
|
||||
if cursor.lastrowid:
|
||||
entities.append({
|
||||
"name": entity_name, "name_normalized": name_norm,
|
||||
"type": entity_type, "description": description,
|
||||
"aliases": [], "mention_count": 1, "db_id": cursor.lastrowid,
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}")
|
||||
|
||||
await db.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 3: Finalisierung
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase3_finalize(
|
||||
db, analysis_id, tenant_id, entity_count, relation_count,
|
||||
article_ids, factcheck_ids, article_ts, factcheck_ts,
|
||||
usage_acc, ws_manager=None,
|
||||
):
|
||||
"""Finalisiert: Zähler, Hash, Log, Status."""
|
||||
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
|
||||
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
await db.execute(
|
||||
"""UPDATE network_analyses
|
||||
SET entity_count = ?, relation_count = ?, status = 'ready',
|
||||
last_generated_at = ?, data_hash = ?
|
||||
WHERE id = ?""",
|
||||
(entity_count, relation_count, now, data_hash, analysis_id),
|
||||
)
|
||||
|
||||
await db.execute(
|
||||
"""INSERT INTO network_generation_log
|
||||
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
|
||||
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
|
||||
entity_count, relation_count, tenant_id)
|
||||
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
|
||||
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
||||
usage_acc.total_cost_usd, usage_acc.call_count,
|
||||
entity_count, relation_count, tenant_id),
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
|
||||
logger.info(f"Analyse {analysis_id} finalisiert: {entity_count} Entitäten, "
|
||||
f"{relation_count} Beziehungen, ${usage_acc.total_cost_usd:.4f}")
|
||||
|
||||
await _broadcast(ws_manager, "network_complete", {
|
||||
"analysis_id": analysis_id,
|
||||
"entity_count": entity_count,
|
||||
"relation_count": relation_count,
|
||||
"cost_usd": round(usage_acc.total_cost_usd, 4),
|
||||
})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hauptfunktion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None):
|
||||
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
|
||||
|
||||
Phase 1: Haiku extrahiert Entitäten aus Artikeln
|
||||
Phase 2: Opus analysiert Beziehungen und korrigiert Haiku-Fehler
|
||||
Phase 3: Finalisierung (Zähler, Hash, Log)
|
||||
"""
|
||||
from database import get_db
|
||||
|
||||
db = await get_db()
|
||||
usage_acc = UsageAccumulator()
|
||||
|
||||
try:
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
logger.warning(f"Analyse {analysis_id} existiert nicht")
|
||||
return
|
||||
|
||||
await db.execute(
|
||||
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
# Incident-IDs laden
|
||||
cursor = await db.execute(
|
||||
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
|
||||
|
||||
if not incident_ids:
|
||||
logger.warning(f"Analyse {analysis_id}: Keine Lagen verknüpft")
|
||||
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
|
||||
await db.commit()
|
||||
await _broadcast(ws_manager, "network_error", {
|
||||
"analysis_id": analysis_id, "error": "Keine Lagen verknüpft",
|
||||
})
|
||||
return
|
||||
|
||||
# Artikel laden
|
||||
placeholders = ",".join("?" * len(incident_ids))
|
||||
cursor = await db.execute(
|
||||
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
|
||||
content_original, content_de, collected_at
|
||||
FROM articles WHERE incident_id IN ({placeholders})""",
|
||||
incident_ids,
|
||||
)
|
||||
article_rows = await cursor.fetchall()
|
||||
articles = []
|
||||
article_ids = []
|
||||
article_ts = []
|
||||
for r in article_rows:
|
||||
articles.append({
|
||||
"id": r["id"], "incident_id": r["incident_id"],
|
||||
"headline": r["headline"], "headline_de": r["headline_de"],
|
||||
"source": r["source"], "source_url": r["source_url"],
|
||||
"content_original": r["content_original"], "content_de": r["content_de"],
|
||||
})
|
||||
article_ids.append(r["id"])
|
||||
article_ts.append(r["collected_at"] or "")
|
||||
|
||||
# Faktenchecks laden
|
||||
cursor = await db.execute(
|
||||
f"""SELECT id, incident_id, claim, status, evidence, checked_at
|
||||
FROM fact_checks WHERE incident_id IN ({placeholders})""",
|
||||
incident_ids,
|
||||
)
|
||||
fc_rows = await cursor.fetchall()
|
||||
factchecks = []
|
||||
factcheck_ids = []
|
||||
factcheck_ts = []
|
||||
for r in fc_rows:
|
||||
factchecks.append({
|
||||
"id": r["id"], "incident_id": r["incident_id"],
|
||||
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
|
||||
})
|
||||
factcheck_ids.append(r["id"])
|
||||
factcheck_ts.append(r["checked_at"] or "")
|
||||
|
||||
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
|
||||
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
|
||||
|
||||
# Phase 1
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
entities = await _phase1_extract_entities(
|
||||
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
|
||||
)
|
||||
|
||||
# Phase 2
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
relations = await _phase2_analyze_relationships(
|
||||
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
|
||||
)
|
||||
|
||||
# Phase 3
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
await _phase3_finalize(
|
||||
db, analysis_id, tenant_id,
|
||||
entity_count=len(entities), relation_count=len(relations),
|
||||
article_ids=article_ids, factcheck_ids=factcheck_ids,
|
||||
article_ts=article_ts, factcheck_ts=factcheck_ts,
|
||||
usage_acc=usage_acc, ws_manager=ws_manager,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Entity-Extraktion fehlgeschlagen (Analyse {analysis_id}): {e}", exc_info=True)
|
||||
try:
|
||||
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
|
||||
await db.commit()
|
||||
except Exception:
|
||||
pass
|
||||
await _broadcast(ws_manager, "network_error", {
|
||||
"analysis_id": analysis_id, "error": str(e),
|
||||
})
|
||||
finally:
|
||||
await db.close()
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren