Phase 1: Entity-Map Key nur noch name_normalized (statt name+type), Typ-Priorität bei Konflikten Phase 2a (neu): Code-basierte Dedup nach name_normalized, merged Typ-Duplikate Phase 2c (neu): Semantische Dedup via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten Phase 2d (neu): Cleanup — Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities Gemeinsamer _merge_entity_in_db Helper für konsistente Entity-Zusammenführung Phase 2b (Opus-Korrekturpass) entfernt, ersetzt durch präzisere Phase 2c Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
235 Zeilen
8.5 KiB
Python
235 Zeilen
8.5 KiB
Python
"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse.
|
|
Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import os
|
|
|
|
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
|
|
|
|
from database import get_db
|
|
from agents.entity_extractor import (
|
|
_phase2a_deduplicate_entities,
|
|
_phase2_analyze_relationships,
|
|
_phase2c_semantic_dedup,
|
|
_phase2d_cleanup,
|
|
_build_entity_name_map,
|
|
_compute_data_hash,
|
|
_broadcast,
|
|
logger,
|
|
)
|
|
from agents.claude_client import UsageAccumulator
|
|
from config import TIMEZONE
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
|
|
|
|
async def regenerate_relations_only(analysis_id: int):
|
|
"""Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus."""
|
|
db = await get_db()
|
|
usage_acc = UsageAccumulator()
|
|
|
|
try:
|
|
# Analyse prüfen
|
|
cursor = await db.execute(
|
|
"SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?",
|
|
(analysis_id,),
|
|
)
|
|
analysis = await cursor.fetchone()
|
|
if not analysis:
|
|
print(f"Analyse {analysis_id} nicht gefunden!")
|
|
return
|
|
|
|
tenant_id = analysis["tenant_id"]
|
|
print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})")
|
|
print(f"Vorhandene Entitäten: {analysis['entity_count']}")
|
|
|
|
# Status auf generating setzen
|
|
await db.execute(
|
|
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
|
|
(analysis_id,),
|
|
)
|
|
await db.commit()
|
|
|
|
# Entitäten aus DB laden (mit db_id!)
|
|
cursor = await db.execute(
|
|
"""SELECT id, name, name_normalized, entity_type, description, aliases, mention_count
|
|
FROM network_entities WHERE network_analysis_id = ?""",
|
|
(analysis_id,),
|
|
)
|
|
entity_rows = await cursor.fetchall()
|
|
entities = []
|
|
for r in entity_rows:
|
|
aliases = []
|
|
try:
|
|
aliases = json.loads(r["aliases"]) if r["aliases"] else []
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
entities.append({
|
|
"name": r["name"],
|
|
"name_normalized": r["name_normalized"],
|
|
"type": r["entity_type"],
|
|
"description": r["description"] or "",
|
|
"aliases": aliases,
|
|
"mention_count": r["mention_count"] or 1,
|
|
"db_id": r["id"],
|
|
})
|
|
|
|
print(f"Geladene Entitäten: {len(entities)}")
|
|
|
|
# Phase 2a: Entity-Deduplication (vor Relation-Löschung)
|
|
print(f"\n--- Phase 2a: Entity-Deduplication ---\n")
|
|
await _phase2a_deduplicate_entities(db, analysis_id, entities)
|
|
print(f"Entitäten nach Dedup: {len(entities)}")
|
|
|
|
# Alte Relations löschen
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
|
(analysis_id,),
|
|
)
|
|
old_count = (await cursor.fetchone())["cnt"]
|
|
print(f"\nLösche {old_count} alte Relations...")
|
|
await db.execute(
|
|
"DELETE FROM network_relations WHERE network_analysis_id = ?",
|
|
(analysis_id,),
|
|
)
|
|
await db.commit()
|
|
|
|
# Incident-IDs laden
|
|
cursor = await db.execute(
|
|
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
|
|
(analysis_id,),
|
|
)
|
|
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
|
|
print(f"Verknüpfte Lagen: {len(incident_ids)}")
|
|
|
|
# Artikel laden
|
|
placeholders = ",".join("?" * len(incident_ids))
|
|
cursor = await db.execute(
|
|
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
|
|
content_original, content_de, collected_at
|
|
FROM articles WHERE incident_id IN ({placeholders})""",
|
|
incident_ids,
|
|
)
|
|
article_rows = await cursor.fetchall()
|
|
articles = []
|
|
article_ids = []
|
|
article_ts = []
|
|
for r in article_rows:
|
|
articles.append({
|
|
"id": r["id"], "incident_id": r["incident_id"],
|
|
"headline": r["headline"], "headline_de": r["headline_de"],
|
|
"source": r["source"], "source_url": r["source_url"],
|
|
"content_original": r["content_original"], "content_de": r["content_de"],
|
|
})
|
|
article_ids.append(r["id"])
|
|
article_ts.append(r["collected_at"] or "")
|
|
|
|
# Faktenchecks laden
|
|
cursor = await db.execute(
|
|
f"""SELECT id, incident_id, claim, status, evidence, checked_at
|
|
FROM fact_checks WHERE incident_id IN ({placeholders})""",
|
|
incident_ids,
|
|
)
|
|
fc_rows = await cursor.fetchall()
|
|
factchecks = []
|
|
factcheck_ids = []
|
|
factcheck_ts = []
|
|
for r in fc_rows:
|
|
factchecks.append({
|
|
"id": r["id"], "incident_id": r["incident_id"],
|
|
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
|
|
})
|
|
factcheck_ids.append(r["id"])
|
|
factcheck_ts.append(r["checked_at"] or "")
|
|
|
|
print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}")
|
|
|
|
# Phase 2: Beziehungsextraktion
|
|
print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n")
|
|
relations = await _phase2_analyze_relationships(
|
|
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc,
|
|
)
|
|
|
|
# Phase 2c: Semantische Deduplication
|
|
print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n")
|
|
await _phase2c_semantic_dedup(
|
|
db, analysis_id, tenant_id, entities, usage_acc,
|
|
)
|
|
|
|
# Phase 2d: Cleanup
|
|
print(f"\n--- Phase 2d: Cleanup ---\n")
|
|
await _phase2d_cleanup(db, analysis_id, entities)
|
|
|
|
# Finale Zähler aus DB
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
|
(analysis_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
final_entity_count = row["cnt"] if row else len(entities)
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
|
(analysis_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
final_relation_count = row["cnt"] if row else len(relations)
|
|
|
|
# Finalisierung
|
|
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
|
|
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
await db.execute(
|
|
"""UPDATE network_analyses
|
|
SET entity_count = ?, relation_count = ?, status = 'ready',
|
|
last_generated_at = ?, data_hash = ?
|
|
WHERE id = ?""",
|
|
(final_entity_count, final_relation_count, now, data_hash, analysis_id),
|
|
)
|
|
|
|
await db.execute(
|
|
"""INSERT INTO network_generation_log
|
|
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
|
|
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
|
|
entity_count, relation_count, tenant_id)
|
|
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
|
|
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
|
usage_acc.total_cost_usd, usage_acc.call_count,
|
|
final_entity_count, final_relation_count, tenant_id),
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"FERTIG!")
|
|
print(f"Entitäten: {final_entity_count}")
|
|
print(f"Beziehungen: {final_relation_count}")
|
|
print(f"API-Calls: {usage_acc.call_count}")
|
|
print(f"Kosten: ${usage_acc.total_cost_usd:.4f}")
|
|
print(f"{'='*60}")
|
|
|
|
except Exception as e:
|
|
print(f"FEHLER: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
try:
|
|
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
|
|
await db.commit()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1
|
|
asyncio.run(regenerate_relations_only(analysis_id))
|