"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse. Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus. """ import asyncio import json import sys import os sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") from database import get_db from agents.entity_extractor import ( _phase2a_deduplicate_entities, _phase2_analyze_relationships, _phase2c_semantic_dedup, _phase2d_cleanup, _build_entity_name_map, _compute_data_hash, _broadcast, logger, ) from agents.claude_client import UsageAccumulator from config import TIMEZONE from datetime import datetime import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) async def regenerate_relations_only(analysis_id: int): """Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus.""" db = await get_db() usage_acc = UsageAccumulator() try: # Analyse prüfen cursor = await db.execute( "SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?", (analysis_id,), ) analysis = await cursor.fetchone() if not analysis: print(f"Analyse {analysis_id} nicht gefunden!") return tenant_id = analysis["tenant_id"] print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})") print(f"Vorhandene Entitäten: {analysis['entity_count']}") # Status auf generating setzen await db.execute( "UPDATE network_analyses SET status = 'generating' WHERE id = ?", (analysis_id,), ) await db.commit() # Entitäten aus DB laden (mit db_id!) cursor = await db.execute( """SELECT id, name, name_normalized, entity_type, description, aliases, mention_count FROM network_entities WHERE network_analysis_id = ?""", (analysis_id,), ) entity_rows = await cursor.fetchall() entities = [] for r in entity_rows: aliases = [] try: aliases = json.loads(r["aliases"]) if r["aliases"] else [] except (json.JSONDecodeError, TypeError): pass entities.append({ "name": r["name"], "name_normalized": r["name_normalized"], "type": r["entity_type"], "description": r["description"] or "", "aliases": aliases, "mention_count": r["mention_count"] or 1, "db_id": r["id"], }) print(f"Geladene Entitäten: {len(entities)}") # Phase 2a: Entity-Deduplication (vor Relation-Löschung) print(f"\n--- Phase 2a: Entity-Deduplication ---\n") await _phase2a_deduplicate_entities(db, analysis_id, entities) print(f"Entitäten nach Dedup: {len(entities)}") # Alte Relations löschen cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", (analysis_id,), ) old_count = (await cursor.fetchone())["cnt"] print(f"\nLösche {old_count} alte Relations...") await db.execute( "DELETE FROM network_relations WHERE network_analysis_id = ?", (analysis_id,), ) await db.commit() # Incident-IDs laden cursor = await db.execute( "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", (analysis_id,), ) incident_ids = [row["incident_id"] for row in await cursor.fetchall()] print(f"Verknüpfte Lagen: {len(incident_ids)}") # Artikel laden placeholders = ",".join("?" * len(incident_ids)) cursor = await db.execute( f"""SELECT id, incident_id, headline, headline_de, source, source_url, content_original, content_de, collected_at FROM articles WHERE incident_id IN ({placeholders})""", incident_ids, ) article_rows = await cursor.fetchall() articles = [] article_ids = [] article_ts = [] for r in article_rows: articles.append({ "id": r["id"], "incident_id": r["incident_id"], "headline": r["headline"], "headline_de": r["headline_de"], "source": r["source"], "source_url": r["source_url"], "content_original": r["content_original"], "content_de": r["content_de"], }) article_ids.append(r["id"]) article_ts.append(r["collected_at"] or "") # Faktenchecks laden cursor = await db.execute( f"""SELECT id, incident_id, claim, status, evidence, checked_at FROM fact_checks WHERE incident_id IN ({placeholders})""", incident_ids, ) fc_rows = await cursor.fetchall() factchecks = [] factcheck_ids = [] factcheck_ts = [] for r in fc_rows: factchecks.append({ "id": r["id"], "incident_id": r["incident_id"], "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], }) factcheck_ids.append(r["id"]) factcheck_ts.append(r["checked_at"] or "") print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}") # Phase 2: Beziehungsextraktion print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n") relations = await _phase2_analyze_relationships( db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ) # Phase 2c: Semantische Deduplication print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n") await _phase2c_semantic_dedup( db, analysis_id, tenant_id, entities, usage_acc, ) # Phase 2d: Cleanup print(f"\n--- Phase 2d: Cleanup ---\n") await _phase2d_cleanup(db, analysis_id, entities) # Finale Zähler aus DB cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?", (analysis_id,), ) row = await cursor.fetchone() final_entity_count = row["cnt"] if row else len(entities) cursor = await db.execute( "SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?", (analysis_id,), ) row = await cursor.fetchone() final_relation_count = row["cnt"] if row else len(relations) # Finalisierung data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") await db.execute( """UPDATE network_analyses SET entity_count = ?, relation_count = ?, status = 'ready', last_generated_at = ?, data_hash = ? WHERE id = ?""", (final_entity_count, final_relation_count, now, data_hash, analysis_id), ) await db.execute( """INSERT INTO network_generation_log (network_analysis_id, completed_at, status, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, entity_count, relation_count, tenant_id) VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, usage_acc.total_cost_usd, usage_acc.call_count, final_entity_count, final_relation_count, tenant_id), ) await db.commit() print(f"\n{'='*60}") print(f"FERTIG!") print(f"Entitäten: {final_entity_count}") print(f"Beziehungen: {final_relation_count}") print(f"API-Calls: {usage_acc.call_count}") print(f"Kosten: ${usage_acc.total_cost_usd:.4f}") print(f"{'='*60}") except Exception as e: print(f"FEHLER: {e}") import traceback traceback.print_exc() try: await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) await db.commit() except Exception: pass finally: await db.close() if __name__ == "__main__": analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1 asyncio.run(regenerate_relations_only(analysis_id))