Commits vergleichen
2 Commits
4d6d022bee
...
acb3c6a6cb
| Autor | SHA1 | Datum | |
|---|---|---|---|
|
|
acb3c6a6cb | ||
|
|
7bfa1d29cf |
234
regenerate_relations.py
Normale Datei
234
regenerate_relations.py
Normale Datei
@@ -0,0 +1,234 @@
|
|||||||
|
"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse.
|
||||||
|
Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
|
||||||
|
|
||||||
|
from database import get_db
|
||||||
|
from agents.entity_extractor import (
|
||||||
|
_phase2a_deduplicate_entities,
|
||||||
|
_phase2_analyze_relationships,
|
||||||
|
_phase2c_semantic_dedup,
|
||||||
|
_phase2d_cleanup,
|
||||||
|
_build_entity_name_map,
|
||||||
|
_compute_data_hash,
|
||||||
|
_broadcast,
|
||||||
|
logger,
|
||||||
|
)
|
||||||
|
from agents.claude_client import UsageAccumulator
|
||||||
|
from config import TIMEZONE
|
||||||
|
from datetime import datetime
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def regenerate_relations_only(analysis_id: int):
|
||||||
|
"""Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus."""
|
||||||
|
db = await get_db()
|
||||||
|
usage_acc = UsageAccumulator()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Analyse prüfen
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
analysis = await cursor.fetchone()
|
||||||
|
if not analysis:
|
||||||
|
print(f"Analyse {analysis_id} nicht gefunden!")
|
||||||
|
return
|
||||||
|
|
||||||
|
tenant_id = analysis["tenant_id"]
|
||||||
|
print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})")
|
||||||
|
print(f"Vorhandene Entitäten: {analysis['entity_count']}")
|
||||||
|
|
||||||
|
# Status auf generating setzen
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
# Entitäten aus DB laden (mit db_id!)
|
||||||
|
cursor = await db.execute(
|
||||||
|
"""SELECT id, name, name_normalized, entity_type, description, aliases, mention_count
|
||||||
|
FROM network_entities WHERE network_analysis_id = ?""",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
entity_rows = await cursor.fetchall()
|
||||||
|
entities = []
|
||||||
|
for r in entity_rows:
|
||||||
|
aliases = []
|
||||||
|
try:
|
||||||
|
aliases = json.loads(r["aliases"]) if r["aliases"] else []
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
pass
|
||||||
|
entities.append({
|
||||||
|
"name": r["name"],
|
||||||
|
"name_normalized": r["name_normalized"],
|
||||||
|
"type": r["entity_type"],
|
||||||
|
"description": r["description"] or "",
|
||||||
|
"aliases": aliases,
|
||||||
|
"mention_count": r["mention_count"] or 1,
|
||||||
|
"db_id": r["id"],
|
||||||
|
})
|
||||||
|
|
||||||
|
print(f"Geladene Entitäten: {len(entities)}")
|
||||||
|
|
||||||
|
# Phase 2a: Entity-Deduplication (vor Relation-Löschung)
|
||||||
|
print(f"\n--- Phase 2a: Entity-Deduplication ---\n")
|
||||||
|
await _phase2a_deduplicate_entities(db, analysis_id, entities)
|
||||||
|
print(f"Entitäten nach Dedup: {len(entities)}")
|
||||||
|
|
||||||
|
# Alte Relations löschen
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
old_count = (await cursor.fetchone())["cnt"]
|
||||||
|
print(f"\nLösche {old_count} alte Relations...")
|
||||||
|
await db.execute(
|
||||||
|
"DELETE FROM network_relations WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
# Incident-IDs laden
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
|
||||||
|
print(f"Verknüpfte Lagen: {len(incident_ids)}")
|
||||||
|
|
||||||
|
# Artikel laden
|
||||||
|
placeholders = ",".join("?" * len(incident_ids))
|
||||||
|
cursor = await db.execute(
|
||||||
|
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
|
||||||
|
content_original, content_de, collected_at
|
||||||
|
FROM articles WHERE incident_id IN ({placeholders})""",
|
||||||
|
incident_ids,
|
||||||
|
)
|
||||||
|
article_rows = await cursor.fetchall()
|
||||||
|
articles = []
|
||||||
|
article_ids = []
|
||||||
|
article_ts = []
|
||||||
|
for r in article_rows:
|
||||||
|
articles.append({
|
||||||
|
"id": r["id"], "incident_id": r["incident_id"],
|
||||||
|
"headline": r["headline"], "headline_de": r["headline_de"],
|
||||||
|
"source": r["source"], "source_url": r["source_url"],
|
||||||
|
"content_original": r["content_original"], "content_de": r["content_de"],
|
||||||
|
})
|
||||||
|
article_ids.append(r["id"])
|
||||||
|
article_ts.append(r["collected_at"] or "")
|
||||||
|
|
||||||
|
# Faktenchecks laden
|
||||||
|
cursor = await db.execute(
|
||||||
|
f"""SELECT id, incident_id, claim, status, evidence, checked_at
|
||||||
|
FROM fact_checks WHERE incident_id IN ({placeholders})""",
|
||||||
|
incident_ids,
|
||||||
|
)
|
||||||
|
fc_rows = await cursor.fetchall()
|
||||||
|
factchecks = []
|
||||||
|
factcheck_ids = []
|
||||||
|
factcheck_ts = []
|
||||||
|
for r in fc_rows:
|
||||||
|
factchecks.append({
|
||||||
|
"id": r["id"], "incident_id": r["incident_id"],
|
||||||
|
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
|
||||||
|
})
|
||||||
|
factcheck_ids.append(r["id"])
|
||||||
|
factcheck_ts.append(r["checked_at"] or "")
|
||||||
|
|
||||||
|
print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}")
|
||||||
|
|
||||||
|
# Phase 2: Beziehungsextraktion
|
||||||
|
print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n")
|
||||||
|
relations = await _phase2_analyze_relationships(
|
||||||
|
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 2c: Semantische Deduplication
|
||||||
|
print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n")
|
||||||
|
await _phase2c_semantic_dedup(
|
||||||
|
db, analysis_id, tenant_id, entities, usage_acc,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 2d: Cleanup
|
||||||
|
print(f"\n--- Phase 2d: Cleanup ---\n")
|
||||||
|
await _phase2d_cleanup(db, analysis_id, entities)
|
||||||
|
|
||||||
|
# Finale Zähler aus DB
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
final_entity_count = row["cnt"] if row else len(entities)
|
||||||
|
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
final_relation_count = row["cnt"] if row else len(relations)
|
||||||
|
|
||||||
|
# Finalisierung
|
||||||
|
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
|
||||||
|
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"""UPDATE network_analyses
|
||||||
|
SET entity_count = ?, relation_count = ?, status = 'ready',
|
||||||
|
last_generated_at = ?, data_hash = ?
|
||||||
|
WHERE id = ?""",
|
||||||
|
(final_entity_count, final_relation_count, now, data_hash, analysis_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"""INSERT INTO network_generation_log
|
||||||
|
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
|
||||||
|
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
|
||||||
|
entity_count, relation_count, tenant_id)
|
||||||
|
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||||
|
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
|
||||||
|
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
||||||
|
usage_acc.total_cost_usd, usage_acc.call_count,
|
||||||
|
final_entity_count, final_relation_count, tenant_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"FERTIG!")
|
||||||
|
print(f"Entitäten: {final_entity_count}")
|
||||||
|
print(f"Beziehungen: {final_relation_count}")
|
||||||
|
print(f"API-Calls: {usage_acc.call_count}")
|
||||||
|
print(f"Kosten: ${usage_acc.total_cost_usd:.4f}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"FEHLER: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
try:
|
||||||
|
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
|
||||||
|
await db.commit()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
await db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1
|
||||||
|
asyncio.run(regenerate_relations_only(analysis_id))
|
||||||
@@ -4,6 +4,7 @@ import hashlib
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -12,6 +13,18 @@ from config import CLAUDE_MODEL_FAST, TIMEZONE
|
|||||||
|
|
||||||
logger = logging.getLogger("osint.entity_extractor")
|
logger = logging.getLogger("osint.entity_extractor")
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Konstanten
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1}
|
||||||
|
|
||||||
|
_STOP_WORDS = frozenset({
|
||||||
|
"the", "of", "and", "for", "in", "on", "at", "to", "by",
|
||||||
|
"von", "der", "die", "das", "und", "für", "des", "den", "dem",
|
||||||
|
"ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach",
|
||||||
|
})
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Prompts
|
# Prompts
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -97,26 +110,33 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
|||||||
}}"""
|
}}"""
|
||||||
|
|
||||||
|
|
||||||
CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler.
|
SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate.
|
||||||
|
|
||||||
ENTITÄTEN:
|
ENTITÄTEN:
|
||||||
{entities_json}
|
{entity_list}
|
||||||
|
|
||||||
AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück:
|
AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt?
|
||||||
1. "name_fix": Name ist falsch geschrieben oder unvollständig
|
|
||||||
2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location)
|
|
||||||
3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken)
|
|
||||||
|
|
||||||
WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur.
|
Typische Duplikate:
|
||||||
|
- Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps"
|
||||||
|
- Sprachvarianten: "European Union" = "Europäische Union"
|
||||||
|
- Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus"
|
||||||
|
|
||||||
|
REGELN:
|
||||||
|
- NUR echte Duplikate zusammenführen (gleiche reale Entität)
|
||||||
|
- NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen
|
||||||
|
- "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name)
|
||||||
|
- "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden
|
||||||
|
|
||||||
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
||||||
{{
|
{{
|
||||||
"corrections": [
|
"merges": [
|
||||||
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
|
{{"keep": 1, "merge": [3, 5]}},
|
||||||
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
|
{{"keep": 2, "merge": [4]}}
|
||||||
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
|
|
||||||
]
|
]
|
||||||
}}"""
|
}}
|
||||||
|
|
||||||
|
Falls KEINE Duplikate: {{"merges": []}}"""
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -174,6 +194,74 @@ def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) ->
|
|||||||
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
|
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Entity-Merge Helper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _merge_entity_in_db(
|
||||||
|
db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict],
|
||||||
|
):
|
||||||
|
"""Führt merge_ent in keep_ent zusammen (DB + In-Memory)."""
|
||||||
|
keep_id = keep_ent["db_id"]
|
||||||
|
merge_id = merge_ent["db_id"]
|
||||||
|
|
||||||
|
# Aliases vereinen
|
||||||
|
aliases = set(keep_ent.get("aliases", []))
|
||||||
|
aliases.add(merge_ent["name"])
|
||||||
|
for a in merge_ent.get("aliases", []):
|
||||||
|
if a and a.strip():
|
||||||
|
aliases.add(a.strip())
|
||||||
|
aliases.discard(keep_ent["name"])
|
||||||
|
keep_ent["aliases"] = list(aliases)
|
||||||
|
|
||||||
|
# Mention count addieren
|
||||||
|
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
|
||||||
|
|
||||||
|
# Längere Description behalten
|
||||||
|
if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")):
|
||||||
|
keep_ent["description"] = merge_ent["description"]
|
||||||
|
|
||||||
|
# Entity-Mentions umhängen
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
||||||
|
(keep_id, merge_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Relations umhängen
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
|
||||||
|
(keep_id, merge_id, analysis_id),
|
||||||
|
)
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
|
||||||
|
(keep_id, merge_id, analysis_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Self-Loops entfernen die durch den Merge entstanden sein könnten
|
||||||
|
await db.execute(
|
||||||
|
"DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?",
|
||||||
|
(keep_id, keep_id, analysis_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keep-Entity in DB aktualisieren
|
||||||
|
await db.execute(
|
||||||
|
"""UPDATE network_entities
|
||||||
|
SET aliases = ?, mention_count = ?, description = ?
|
||||||
|
WHERE id = ?""",
|
||||||
|
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
|
||||||
|
keep_ent["mention_count"], keep_ent.get("description", ""), keep_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Merge-Entity löschen
|
||||||
|
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,))
|
||||||
|
|
||||||
|
# Aus entities-Liste entfernen
|
||||||
|
try:
|
||||||
|
entities.remove(merge_ent)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Phase 1: Entity-Extraktion (Haiku)
|
# Phase 1: Entity-Extraktion (Haiku)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -209,7 +297,7 @@ async def _phase1_extract_entities(
|
|||||||
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
|
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
|
||||||
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
|
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
|
||||||
|
|
||||||
entity_map: dict[tuple[str, str], dict] = {}
|
entity_map: dict[str, dict] = {}
|
||||||
|
|
||||||
for batch_idx, batch in enumerate(batches):
|
for batch_idx, batch in enumerate(batches):
|
||||||
articles_text = "\n\n---\n\n".join(batch)
|
articles_text = "\n\n---\n\n".join(batch)
|
||||||
@@ -244,7 +332,7 @@ async def _phase1_extract_entities(
|
|||||||
if entity_type not in valid_types:
|
if entity_type not in valid_types:
|
||||||
entity_type = "organisation"
|
entity_type = "organisation"
|
||||||
|
|
||||||
key = (name_normalized, entity_type)
|
key = name_normalized
|
||||||
|
|
||||||
if key in entity_map:
|
if key in entity_map:
|
||||||
existing = entity_map[key]
|
existing = entity_map[key]
|
||||||
@@ -259,6 +347,9 @@ async def _phase1_extract_entities(
|
|||||||
new_desc = ent.get("description", "")
|
new_desc = ent.get("description", "")
|
||||||
if len(new_desc) > len(existing.get("description", "")):
|
if len(new_desc) > len(existing.get("description", "")):
|
||||||
existing["description"] = new_desc
|
existing["description"] = new_desc
|
||||||
|
# Typ-Priorität: höherwertigen Typ behalten
|
||||||
|
if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0):
|
||||||
|
existing["type"] = entity_type
|
||||||
else:
|
else:
|
||||||
entity_map[key] = {
|
entity_map[key] = {
|
||||||
"name": name,
|
"name": name,
|
||||||
@@ -303,6 +394,55 @@ async def _phase1_extract_entities(
|
|||||||
return all_entities
|
return all_entities
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Phase 2a: Entity-Deduplication nach name_normalized
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _phase2a_deduplicate_entities(
|
||||||
|
db, analysis_id: int, entities: list[dict], ws_manager=None,
|
||||||
|
) -> None:
|
||||||
|
"""Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ)."""
|
||||||
|
logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate")
|
||||||
|
|
||||||
|
groups = defaultdict(list)
|
||||||
|
for ent in entities:
|
||||||
|
if ent.get("db_id"):
|
||||||
|
groups[ent["name_normalized"]].append(ent)
|
||||||
|
|
||||||
|
merge_count = 0
|
||||||
|
merged_ids = set()
|
||||||
|
|
||||||
|
for nn, group in groups.items():
|
||||||
|
if len(group) < 2:
|
||||||
|
continue
|
||||||
|
# Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen
|
||||||
|
group.sort(
|
||||||
|
key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)),
|
||||||
|
reverse=True,
|
||||||
|
)
|
||||||
|
keep = group[0]
|
||||||
|
for merge in group[1:]:
|
||||||
|
if merge["db_id"] in merged_ids:
|
||||||
|
continue
|
||||||
|
await _merge_entity_in_db(db, analysis_id, keep, merge, entities)
|
||||||
|
merged_ids.add(merge["db_id"])
|
||||||
|
merge_count += 1
|
||||||
|
|
||||||
|
if merge_count > 0:
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, "
|
||||||
|
f"{len(entities)} Entitäten verbleiben"
|
||||||
|
)
|
||||||
|
|
||||||
|
await _broadcast(ws_manager, "network_status", {
|
||||||
|
"analysis_id": analysis_id,
|
||||||
|
"phase": "entity_dedup",
|
||||||
|
"progress": 100,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
|
# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -566,186 +706,232 @@ async def _phase2_analyze_relationships(
|
|||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Phase 2b: Korrekturen (Opus)
|
# Phase 2c: Semantische Deduplication (Opus)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async def _phase2b_corrections(
|
async def _phase2c_semantic_dedup(
|
||||||
db, analysis_id: int, tenant_id: int,
|
db, analysis_id: int, tenant_id: int,
|
||||||
entities: list[dict], relation_count: int,
|
entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None,
|
||||||
usage_acc: UsageAccumulator, ws_manager=None,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add)."""
|
"""Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten."""
|
||||||
if len(entities) < 10:
|
if len(entities) < 10:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten")
|
logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten")
|
||||||
|
|
||||||
await _broadcast(ws_manager, "network_status", {
|
await _broadcast(ws_manager, "network_status", {
|
||||||
"analysis_id": analysis_id,
|
"analysis_id": analysis_id,
|
||||||
"phase": "correction",
|
"phase": "semantic_dedup",
|
||||||
"progress": 0,
|
"progress": 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
entities_for_prompt = []
|
# --- Clustering: Token-basiert + Abbreviation-Matching ---
|
||||||
for e in entities:
|
token_to_ids = defaultdict(set)
|
||||||
entities_for_prompt.append({
|
db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")}
|
||||||
"name": e["name"],
|
|
||||||
"type": e["type"],
|
|
||||||
"description": e.get("description", ""),
|
|
||||||
"aliases": e.get("aliases", []),
|
|
||||||
})
|
|
||||||
|
|
||||||
batch_size = 500
|
for ent in entities:
|
||||||
entity_batches = [entities_for_prompt[i:i + batch_size]
|
db_id = ent.get("db_id")
|
||||||
for i in range(0, len(entities_for_prompt), batch_size)]
|
if not db_id:
|
||||||
|
continue
|
||||||
|
|
||||||
all_corrections = []
|
all_names = [ent["name_normalized"]]
|
||||||
for bi, eb in enumerate(entity_batches):
|
for a in ent.get("aliases", []):
|
||||||
entities_json = json.dumps(eb, ensure_ascii=False, indent=1)
|
if a:
|
||||||
prompt = CORRECTION_PROMPT.format(entities_json=entities_json)
|
all_names.append(a.lower())
|
||||||
|
|
||||||
|
# Token-basiertes Clustering
|
||||||
|
for name in all_names:
|
||||||
|
for word in re.split(r'[\s\-_/(),.;:]+', name):
|
||||||
|
word = word.strip()
|
||||||
|
if len(word) >= 3 and word not in _STOP_WORDS:
|
||||||
|
token_to_ids[word].add(db_id)
|
||||||
|
|
||||||
|
# Abbreviation-Matching
|
||||||
|
all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a]
|
||||||
|
for name in all_display_names:
|
||||||
|
words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2]
|
||||||
|
if len(words) >= 2:
|
||||||
|
abbr = "".join(w[0] for w in words).lower()
|
||||||
|
if len(abbr) >= 2:
|
||||||
|
token_to_ids[f"_abbr_{abbr}"].add(db_id)
|
||||||
|
# Kurze Namen als potenzielle Abkürzungen
|
||||||
|
name_clean = re.sub(r'[^a-zA-Z]', '', name).lower()
|
||||||
|
if 2 <= len(name_clean) <= 6:
|
||||||
|
token_to_ids[f"_abbr_{name_clean}"].add(db_id)
|
||||||
|
|
||||||
|
# Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities)
|
||||||
|
seen_clusters = set()
|
||||||
|
candidate_clusters = []
|
||||||
|
for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])):
|
||||||
|
if len(db_ids) < 2 or len(db_ids) > 40:
|
||||||
|
continue
|
||||||
|
key = frozenset(db_ids)
|
||||||
|
if key in seen_clusters:
|
||||||
|
continue
|
||||||
|
seen_clusters.add(key)
|
||||||
|
candidate_clusters.append(list(db_ids))
|
||||||
|
|
||||||
|
logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden")
|
||||||
|
|
||||||
|
# --- Opus-Calls für jeden Cluster ---
|
||||||
|
merged_away = set()
|
||||||
|
total_merges = 0
|
||||||
|
opus_calls = 0
|
||||||
|
max_calls = 50
|
||||||
|
|
||||||
|
for ci, cluster_ids in enumerate(candidate_clusters):
|
||||||
|
if opus_calls >= max_calls:
|
||||||
|
logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Bereits gemergte Entities filtern
|
||||||
|
active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map]
|
||||||
|
if len(active_ids) < 2:
|
||||||
|
continue
|
||||||
|
|
||||||
|
active_ents = [db_id_map[did] for did in active_ids]
|
||||||
|
|
||||||
|
# Prompt bauen
|
||||||
|
lines = []
|
||||||
|
for i, ent in enumerate(active_ents, 1):
|
||||||
|
aliases_str = ", ".join((ent.get("aliases") or [])[:5])
|
||||||
|
line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)"
|
||||||
|
if aliases_str:
|
||||||
|
line += f" [Aliases: {aliases_str}]"
|
||||||
|
lines.append(line)
|
||||||
|
|
||||||
|
prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result_text, usage = await call_claude(prompt, tools=None, model=None)
|
result_text, usage = await call_claude(prompt, tools=None, model=None)
|
||||||
usage_acc.add(usage)
|
usage_acc.add(usage)
|
||||||
|
opus_calls += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}")
|
logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
parsed = _parse_json_response(result_text)
|
parsed = _parse_json_response(result_text)
|
||||||
if not parsed:
|
if not parsed:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
corrections = parsed.get("corrections", [])
|
for merge_group in parsed.get("merges", []):
|
||||||
if isinstance(corrections, list):
|
keep_idx = merge_group.get("keep")
|
||||||
all_corrections.extend(corrections)
|
merge_indices = merge_group.get("merge", [])
|
||||||
|
if keep_idx is None or not merge_indices:
|
||||||
|
continue
|
||||||
|
if not isinstance(merge_indices, list):
|
||||||
|
merge_indices = [merge_indices]
|
||||||
|
|
||||||
logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen")
|
keep_idx -= 1 # 1-indexed → 0-indexed
|
||||||
|
if keep_idx < 0 or keep_idx >= len(active_ents):
|
||||||
|
continue
|
||||||
|
keep_ent = active_ents[keep_idx]
|
||||||
|
if keep_ent["db_id"] in merged_away:
|
||||||
|
continue
|
||||||
|
|
||||||
if all_corrections:
|
for mi in merge_indices:
|
||||||
await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections)
|
mi -= 1
|
||||||
logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet")
|
if mi < 0 or mi >= len(active_ents) or mi == keep_idx:
|
||||||
|
continue
|
||||||
|
merge_ent = active_ents[mi]
|
||||||
|
if merge_ent["db_id"] in merged_away:
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(f"Semantic Merge: '{merge_ent['name']}' → '{keep_ent['name']}'")
|
||||||
|
await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities)
|
||||||
|
merged_away.add(merge_ent["db_id"])
|
||||||
|
db_id_map.pop(merge_ent["db_id"], None)
|
||||||
|
total_merges += 1
|
||||||
|
|
||||||
|
# Progress
|
||||||
|
progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100
|
||||||
|
await _broadcast(ws_manager, "network_status", {
|
||||||
|
"analysis_id": analysis_id,
|
||||||
|
"phase": "semantic_dedup",
|
||||||
|
"progress": min(progress, 100),
|
||||||
|
})
|
||||||
|
|
||||||
|
if total_merges > 0:
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, "
|
||||||
|
f"{len(entities)} Entitäten verbleiben"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Phase 2d: Cleanup
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _phase2d_cleanup(
|
||||||
|
db, analysis_id: int, entities: list[dict], ws_manager=None,
|
||||||
|
) -> None:
|
||||||
|
"""Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities."""
|
||||||
|
logger.info("Phase 2d: Cleanup")
|
||||||
|
|
||||||
|
# 1. Self-Loops entfernen
|
||||||
|
cursor = await db.execute(
|
||||||
|
"DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
self_loops = cursor.rowcount
|
||||||
|
|
||||||
|
# 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein)
|
||||||
|
await db.execute(
|
||||||
|
"""UPDATE network_relations
|
||||||
|
SET source_entity_id = target_entity_id, target_entity_id = source_entity_id
|
||||||
|
WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung)
|
||||||
|
cursor = await db.execute(
|
||||||
|
"""DELETE FROM network_relations
|
||||||
|
WHERE network_analysis_id = ? AND id NOT IN (
|
||||||
|
SELECT MIN(id) FROM network_relations
|
||||||
|
WHERE network_analysis_id = ?
|
||||||
|
GROUP BY source_entity_id, target_entity_id, category
|
||||||
|
)""",
|
||||||
|
(analysis_id, analysis_id),
|
||||||
|
)
|
||||||
|
dup_relations = cursor.rowcount
|
||||||
|
|
||||||
|
# 4. Verwaiste Entities entfernen (keine Verbindungen)
|
||||||
|
cursor = await db.execute(
|
||||||
|
"""DELETE FROM network_entities
|
||||||
|
WHERE network_analysis_id = ? AND id NOT IN (
|
||||||
|
SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ?
|
||||||
|
UNION
|
||||||
|
SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ?
|
||||||
|
)""",
|
||||||
|
(analysis_id, analysis_id, analysis_id),
|
||||||
|
)
|
||||||
|
orphans = cursor.rowcount
|
||||||
|
|
||||||
|
# Entities-Liste aktualisieren
|
||||||
|
remaining_ids = set()
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,)
|
||||||
|
)
|
||||||
|
for row in await cursor.fetchall():
|
||||||
|
remaining_ids.add(row["id"])
|
||||||
|
entities[:] = [e for e in entities if e.get("db_id") in remaining_ids]
|
||||||
|
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, "
|
||||||
|
f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben"
|
||||||
|
)
|
||||||
|
|
||||||
await _broadcast(ws_manager, "network_status", {
|
await _broadcast(ws_manager, "network_status", {
|
||||||
"analysis_id": analysis_id,
|
"analysis_id": analysis_id,
|
||||||
"phase": "correction",
|
"phase": "cleanup",
|
||||||
"progress": 100,
|
"progress": 100,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
|
|
||||||
"""Wendet Opus-Korrekturen auf Entitäten an."""
|
|
||||||
for corr in corrections:
|
|
||||||
if not isinstance(corr, dict):
|
|
||||||
continue
|
|
||||||
corr_type = corr.get("type", "")
|
|
||||||
|
|
||||||
try:
|
|
||||||
if corr_type == "name_fix":
|
|
||||||
entity_name = (corr.get("entity_name") or "").strip()
|
|
||||||
corrected_name = (corr.get("corrected_name") or "").strip()
|
|
||||||
if not entity_name or not corrected_name:
|
|
||||||
continue
|
|
||||||
|
|
||||||
for ent in entities:
|
|
||||||
if ent["name"].lower() == entity_name.lower() or \
|
|
||||||
ent["name_normalized"] == entity_name.lower():
|
|
||||||
old_name = ent["name"]
|
|
||||||
ent["name"] = corrected_name
|
|
||||||
ent["name_normalized"] = corrected_name.lower().strip()
|
|
||||||
ent.setdefault("aliases", [])
|
|
||||||
if old_name not in ent["aliases"]:
|
|
||||||
ent["aliases"].append(old_name)
|
|
||||||
|
|
||||||
if ent.get("db_id"):
|
|
||||||
await db.execute(
|
|
||||||
"""UPDATE network_entities
|
|
||||||
SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1
|
|
||||||
WHERE id = ?""",
|
|
||||||
(corrected_name, corrected_name.lower().strip(),
|
|
||||||
json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]),
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
elif corr_type == "merge":
|
|
||||||
keep_name = (corr.get("entity_name") or "").strip()
|
|
||||||
merge_name = (corr.get("merge_with") or "").strip()
|
|
||||||
if not keep_name or not merge_name:
|
|
||||||
continue
|
|
||||||
|
|
||||||
keep_ent = merge_ent = None
|
|
||||||
for ent in entities:
|
|
||||||
nl = ent["name"].lower()
|
|
||||||
nn = ent["name_normalized"]
|
|
||||||
if nl == keep_name.lower() or nn == keep_name.lower():
|
|
||||||
keep_ent = ent
|
|
||||||
elif nl == merge_name.lower() or nn == merge_name.lower():
|
|
||||||
merge_ent = ent
|
|
||||||
|
|
||||||
if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"):
|
|
||||||
aliases = set(keep_ent.get("aliases", []))
|
|
||||||
aliases.add(merge_ent["name"])
|
|
||||||
for a in merge_ent.get("aliases", []):
|
|
||||||
if a and a.strip():
|
|
||||||
aliases.add(a.strip())
|
|
||||||
keep_ent["aliases"] = list(aliases)
|
|
||||||
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
|
|
||||||
|
|
||||||
await db.execute(
|
|
||||||
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
|
||||||
(keep_ent["db_id"], merge_ent["db_id"]),
|
|
||||||
)
|
|
||||||
await db.execute(
|
|
||||||
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
|
|
||||||
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
|
|
||||||
)
|
|
||||||
await db.execute(
|
|
||||||
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
|
|
||||||
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
|
|
||||||
)
|
|
||||||
await db.execute(
|
|
||||||
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
|
|
||||||
WHERE id = ?""",
|
|
||||||
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
|
|
||||||
keep_ent["mention_count"], keep_ent["db_id"]),
|
|
||||||
)
|
|
||||||
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],))
|
|
||||||
entities.remove(merge_ent)
|
|
||||||
|
|
||||||
elif corr_type == "add":
|
|
||||||
entity_name = (corr.get("entity_name") or "").strip()
|
|
||||||
entity_type = (corr.get("entity_type") or "organisation").lower().strip()
|
|
||||||
description = corr.get("description", "")
|
|
||||||
if not entity_name:
|
|
||||||
continue
|
|
||||||
|
|
||||||
valid_types = {"person", "organisation", "location", "event", "military"}
|
|
||||||
if entity_type not in valid_types:
|
|
||||||
entity_type = "organisation"
|
|
||||||
|
|
||||||
name_norm = entity_name.lower().strip()
|
|
||||||
if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities):
|
|
||||||
continue
|
|
||||||
|
|
||||||
cursor = await db.execute(
|
|
||||||
"""INSERT OR IGNORE INTO network_entities
|
|
||||||
(network_analysis_id, name, name_normalized, entity_type,
|
|
||||||
description, aliases, mention_count, corrected_by_opus, tenant_id)
|
|
||||||
VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""",
|
|
||||||
(analysis_id, entity_name, name_norm, entity_type, description, tenant_id),
|
|
||||||
)
|
|
||||||
if cursor.lastrowid:
|
|
||||||
entities.append({
|
|
||||||
"name": entity_name, "name_normalized": name_norm,
|
|
||||||
"type": entity_type, "description": description,
|
|
||||||
"aliases": [], "mention_count": 1, "db_id": cursor.lastrowid,
|
|
||||||
})
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}")
|
|
||||||
|
|
||||||
await db.commit()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Phase 3: Finalisierung
|
# Phase 3: Finalisierung
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -800,8 +986,10 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
|||||||
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
|
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
|
||||||
|
|
||||||
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
|
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
|
||||||
|
Phase 2a: Entity-Dedup nach name_normalized (Code, kein API)
|
||||||
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
|
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
|
||||||
Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add)
|
Phase 2c: Semantische Dedup via Opus (Cluster-weise)
|
||||||
|
Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations)
|
||||||
Phase 3: Finalisierung (Zähler, Hash, Log)
|
Phase 3: Finalisierung (Zähler, Hash, Log)
|
||||||
"""
|
"""
|
||||||
from database import get_db
|
from database import get_db
|
||||||
@@ -879,7 +1067,7 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
|||||||
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
|
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
|
||||||
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
|
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
|
||||||
|
|
||||||
# Phase 1
|
# Phase 1: Entity-Extraktion
|
||||||
if not await _check_analysis_exists(db, analysis_id):
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -887,7 +1075,13 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
|||||||
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
|
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Phase 2
|
# Phase 2a: Entity-Deduplication
|
||||||
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager)
|
||||||
|
|
||||||
|
# Phase 2: Beziehungsextraktion
|
||||||
if not await _check_analysis_exists(db, analysis_id):
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -895,15 +1089,21 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
|||||||
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
|
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Phase 2b: Korrekturen
|
# Phase 2c: Semantische Deduplication
|
||||||
if not await _check_analysis_exists(db, analysis_id):
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
await _phase2b_corrections(
|
await _phase2c_semantic_dedup(
|
||||||
db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager,
|
db, analysis_id, tenant_id, entities, usage_acc, ws_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Entity-Count nach Korrekturen aktualisieren
|
# Phase 2d: Cleanup
|
||||||
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
await _phase2d_cleanup(db, analysis_id, entities, ws_manager)
|
||||||
|
|
||||||
|
# Finale Zähler aus DB (nach allen Cleanups)
|
||||||
cursor = await db.execute(
|
cursor = await db.execute(
|
||||||
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
||||||
(analysis_id,),
|
(analysis_id,),
|
||||||
@@ -911,13 +1111,20 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
|||||||
row = await cursor.fetchone()
|
row = await cursor.fetchone()
|
||||||
final_entity_count = row["cnt"] if row else len(entities)
|
final_entity_count = row["cnt"] if row else len(entities)
|
||||||
|
|
||||||
# Phase 3
|
cursor = await db.execute(
|
||||||
|
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||||
|
(analysis_id,),
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
final_relation_count = row["cnt"] if row else len(relations)
|
||||||
|
|
||||||
|
# Phase 3: Finalisierung
|
||||||
if not await _check_analysis_exists(db, analysis_id):
|
if not await _check_analysis_exists(db, analysis_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
await _phase3_finalize(
|
await _phase3_finalize(
|
||||||
db, analysis_id, tenant_id,
|
db, analysis_id, tenant_id,
|
||||||
entity_count=final_entity_count, relation_count=len(relations),
|
entity_count=final_entity_count, relation_count=final_relation_count,
|
||||||
article_ids=article_ids, factcheck_ids=factcheck_ids,
|
article_ids=article_ids, factcheck_ids=factcheck_ids,
|
||||||
article_ts=article_ts, factcheck_ts=factcheck_ts,
|
article_ts=article_ts, factcheck_ts=factcheck_ts,
|
||||||
usage_acc=usage_acc, ws_manager=ws_manager,
|
usage_acc=usage_acc, ws_manager=ws_manager,
|
||||||
|
|||||||
@@ -1316,6 +1316,41 @@ class AgentOrchestrator:
|
|||||||
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
|
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Credits-Tracking: Monatliche Aggregation + Credits abziehen
|
||||||
|
if tenant_id and usage_acc.total_cost_usd > 0:
|
||||||
|
year_month = datetime.now(TIMEZONE).strftime('%Y-%m')
|
||||||
|
await db.execute("""
|
||||||
|
INSERT INTO token_usage_monthly
|
||||||
|
(organization_id, year_month, input_tokens, output_tokens,
|
||||||
|
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, refresh_count)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
|
||||||
|
ON CONFLICT(organization_id, year_month) DO UPDATE SET
|
||||||
|
input_tokens = input_tokens + excluded.input_tokens,
|
||||||
|
output_tokens = output_tokens + excluded.output_tokens,
|
||||||
|
cache_creation_tokens = cache_creation_tokens + excluded.cache_creation_tokens,
|
||||||
|
cache_read_tokens = cache_read_tokens + excluded.cache_read_tokens,
|
||||||
|
total_cost_usd = total_cost_usd + excluded.total_cost_usd,
|
||||||
|
api_calls = api_calls + excluded.api_calls,
|
||||||
|
refresh_count = refresh_count + 1,
|
||||||
|
updated_at = CURRENT_TIMESTAMP
|
||||||
|
""", (tenant_id, year_month,
|
||||||
|
usage_acc.input_tokens, usage_acc.output_tokens,
|
||||||
|
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
||||||
|
round(usage_acc.total_cost_usd, 7), usage_acc.call_count))
|
||||||
|
|
||||||
|
# Credits auf Lizenz abziehen
|
||||||
|
lic_cursor = await db.execute(
|
||||||
|
"SELECT cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
|
||||||
|
(tenant_id,))
|
||||||
|
lic = await lic_cursor.fetchone()
|
||||||
|
if lic and lic["cost_per_credit"] and lic["cost_per_credit"] > 0:
|
||||||
|
credits_consumed = usage_acc.total_cost_usd / lic["cost_per_credit"]
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE licenses SET credits_used = COALESCE(credits_used, 0) + ? WHERE organization_id = ? AND status = 'active'",
|
||||||
|
(round(credits_consumed, 2), tenant_id))
|
||||||
|
await db.commit()
|
||||||
|
logger.info(f"Credits: {round(credits_consumed, 1) if lic and lic['cost_per_credit'] else 0} abgezogen für Tenant {tenant_id}")
|
||||||
|
|
||||||
# Quellen-Discovery im Background starten
|
# Quellen-Discovery im Background starten
|
||||||
if unique_results:
|
if unique_results:
|
||||||
asyncio.create_task(_background_discover_sources(unique_results))
|
asyncio.create_task(_background_discover_sources(unique_results))
|
||||||
|
|||||||
@@ -582,6 +582,41 @@ async def init_db():
|
|||||||
await db.commit()
|
await db.commit()
|
||||||
logger.info("Migration: article_locations-Tabelle erstellt")
|
logger.info("Migration: article_locations-Tabelle erstellt")
|
||||||
|
|
||||||
|
|
||||||
|
# Migration: Credits-System fuer Lizenzen
|
||||||
|
cursor = await db.execute("PRAGMA table_info(licenses)")
|
||||||
|
columns = [row[1] for row in await cursor.fetchall()]
|
||||||
|
if "token_budget_usd" not in columns:
|
||||||
|
await db.execute("ALTER TABLE licenses ADD COLUMN token_budget_usd REAL")
|
||||||
|
await db.execute("ALTER TABLE licenses ADD COLUMN credits_total INTEGER")
|
||||||
|
await db.execute("ALTER TABLE licenses ADD COLUMN credits_used REAL DEFAULT 0")
|
||||||
|
await db.execute("ALTER TABLE licenses ADD COLUMN cost_per_credit REAL")
|
||||||
|
await db.execute("ALTER TABLE licenses ADD COLUMN budget_warning_percent INTEGER DEFAULT 80")
|
||||||
|
await db.commit()
|
||||||
|
logger.info("Migration: Credits-System zu Lizenzen hinzugefuegt")
|
||||||
|
|
||||||
|
# Migration: Token-Usage-Monatstabelle
|
||||||
|
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='token_usage_monthly'")
|
||||||
|
if not await cursor.fetchone():
|
||||||
|
await db.execute("""
|
||||||
|
CREATE TABLE token_usage_monthly (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
organization_id INTEGER REFERENCES organizations(id),
|
||||||
|
year_month TEXT NOT NULL,
|
||||||
|
input_tokens INTEGER DEFAULT 0,
|
||||||
|
output_tokens INTEGER DEFAULT 0,
|
||||||
|
cache_creation_tokens INTEGER DEFAULT 0,
|
||||||
|
cache_read_tokens INTEGER DEFAULT 0,
|
||||||
|
total_cost_usd REAL DEFAULT 0.0,
|
||||||
|
api_calls INTEGER DEFAULT 0,
|
||||||
|
refresh_count INTEGER DEFAULT 0,
|
||||||
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
UNIQUE(organization_id, year_month)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
await db.commit()
|
||||||
|
logger.info("Migration: token_usage_monthly Tabelle erstellt")
|
||||||
|
|
||||||
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
|
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
|
||||||
await db.execute(
|
await db.execute(
|
||||||
"""UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart',
|
"""UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart',
|
||||||
|
|||||||
@@ -41,6 +41,9 @@ class UserMeResponse(BaseModel):
|
|||||||
license_status: str = "unknown"
|
license_status: str = "unknown"
|
||||||
license_type: str = ""
|
license_type: str = ""
|
||||||
read_only: bool = False
|
read_only: bool = False
|
||||||
|
credits_total: Optional[int] = None
|
||||||
|
credits_remaining: Optional[int] = None
|
||||||
|
credits_percent_used: Optional[float] = None
|
||||||
|
|
||||||
|
|
||||||
# Incidents (Lagen)
|
# Incidents (Lagen)
|
||||||
|
|||||||
@@ -261,10 +261,28 @@ async def get_me(
|
|||||||
from services.license_service import check_license
|
from services.license_service import check_license
|
||||||
license_info = await check_license(db, current_user["tenant_id"])
|
license_info = await check_license(db, current_user["tenant_id"])
|
||||||
|
|
||||||
|
# Credits-Daten laden
|
||||||
|
credits_total = None
|
||||||
|
credits_remaining = None
|
||||||
|
credits_percent_used = None
|
||||||
|
if current_user.get("tenant_id"):
|
||||||
|
lic_cursor = await db.execute(
|
||||||
|
"SELECT credits_total, credits_used, cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
|
||||||
|
(current_user["tenant_id"],))
|
||||||
|
lic_row = await lic_cursor.fetchone()
|
||||||
|
if lic_row and lic_row["credits_total"]:
|
||||||
|
credits_total = lic_row["credits_total"]
|
||||||
|
credits_used = lic_row["credits_used"] or 0
|
||||||
|
credits_remaining = max(0, int(credits_total - credits_used))
|
||||||
|
credits_percent_used = round(min(100, (credits_used / credits_total) * 100), 1) if credits_total > 0 else 0
|
||||||
|
|
||||||
return UserMeResponse(
|
return UserMeResponse(
|
||||||
id=current_user["id"],
|
id=current_user["id"],
|
||||||
username=current_user["username"],
|
username=current_user["username"],
|
||||||
email=current_user.get("email", ""),
|
email=current_user.get("email", ""),
|
||||||
|
credits_total=credits_total,
|
||||||
|
credits_remaining=credits_remaining,
|
||||||
|
credits_percent_used=credits_percent_used,
|
||||||
role=current_user["role"],
|
role=current_user["role"],
|
||||||
org_name=org_name,
|
org_name=org_name,
|
||||||
org_slug=current_user.get("org_slug", ""),
|
org_slug=current_user.get("org_slug", ""),
|
||||||
|
|||||||
@@ -5335,3 +5335,61 @@ body.tutorial-active .tutorial-cursor {
|
|||||||
color: var(--text-primary);
|
color: var(--text-primary);
|
||||||
box-shadow: 0 0 0 2px rgba(150, 121, 26, 0.25);
|
box-shadow: 0 0 0 2px rgba(150, 121, 26, 0.25);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ===== Credits-Anzeige im User-Dropdown ===== */
|
||||||
|
.credits-section {
|
||||||
|
padding: 8px 16px 12px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-divider {
|
||||||
|
height: 1px;
|
||||||
|
background: var(--border);
|
||||||
|
margin-bottom: 10px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-label {
|
||||||
|
font-size: 11px;
|
||||||
|
font-weight: 600;
|
||||||
|
text-transform: uppercase;
|
||||||
|
letter-spacing: 0.5px;
|
||||||
|
color: var(--text-secondary);
|
||||||
|
margin-bottom: 6px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-bar-container {
|
||||||
|
width: 100%;
|
||||||
|
height: 6px;
|
||||||
|
background: var(--bg-tertiary);
|
||||||
|
border-radius: 3px;
|
||||||
|
overflow: hidden;
|
||||||
|
margin-bottom: 4px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-bar {
|
||||||
|
height: 100%;
|
||||||
|
border-radius: 3px;
|
||||||
|
background: var(--accent);
|
||||||
|
transition: width 0.6s ease, background-color 0.3s ease;
|
||||||
|
min-width: 2px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-bar.warning {
|
||||||
|
background: #e67e22;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-bar.critical {
|
||||||
|
background: #e74c3c;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-info {
|
||||||
|
font-size: 12px;
|
||||||
|
color: var(--text-secondary);
|
||||||
|
display: flex;
|
||||||
|
justify-content: center;
|
||||||
|
gap: 4px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.credits-info span {
|
||||||
|
font-weight: 600;
|
||||||
|
color: var(--text-primary);
|
||||||
|
}
|
||||||
|
|||||||
@@ -50,6 +50,16 @@
|
|||||||
<span class="header-dropdown-label">Lizenz</span>
|
<span class="header-dropdown-label">Lizenz</span>
|
||||||
<span class="header-dropdown-value" id="header-license-info">-</span>
|
<span class="header-dropdown-value" id="header-license-info">-</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div id="credits-section" class="credits-section" style="display: none;">
|
||||||
|
<div class="credits-divider"></div>
|
||||||
|
<div class="credits-label">Credits</div>
|
||||||
|
<div class="credits-bar-container">
|
||||||
|
<div id="credits-bar" class="credits-bar"></div>
|
||||||
|
</div>
|
||||||
|
<div class="credits-info">
|
||||||
|
<span id="credits-remaining">0</span> von <span id="credits-total">0</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div class="header-license-warning" id="header-license-warning"></div>
|
<div class="header-license-warning" id="header-license-warning"></div>
|
||||||
|
|||||||
@@ -466,6 +466,32 @@ const App = {
|
|||||||
licInfoEl.textContent = label;
|
licInfoEl.textContent = label;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Credits-Anzeige im Dropdown
|
||||||
|
const creditsSection = document.getElementById('credits-section');
|
||||||
|
if (creditsSection && user.credits_total) {
|
||||||
|
creditsSection.style.display = 'block';
|
||||||
|
const bar = document.getElementById('credits-bar');
|
||||||
|
const remainingEl = document.getElementById('credits-remaining');
|
||||||
|
const totalEl = document.getElementById('credits-total');
|
||||||
|
|
||||||
|
const remaining = user.credits_remaining || 0;
|
||||||
|
const total = user.credits_total || 1;
|
||||||
|
const percentUsed = user.credits_percent_used || 0;
|
||||||
|
const percentRemaining = Math.max(0, 100 - percentUsed);
|
||||||
|
|
||||||
|
remainingEl.textContent = remaining.toLocaleString('de-DE');
|
||||||
|
totalEl.textContent = total.toLocaleString('de-DE');
|
||||||
|
bar.style.width = percentRemaining + '%';
|
||||||
|
|
||||||
|
// Farbwechsel je nach Verbrauch
|
||||||
|
bar.classList.remove('warning', 'critical');
|
||||||
|
if (percentUsed > 80) {
|
||||||
|
bar.classList.add('critical');
|
||||||
|
} else if (percentUsed > 50) {
|
||||||
|
bar.classList.add('warning');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Dropdown Toggle
|
// Dropdown Toggle
|
||||||
const userBtn = document.getElementById('header-user-btn');
|
const userBtn = document.getElementById('header-user-btn');
|
||||||
const userDropdown = document.getElementById('header-user-dropdown');
|
const userDropdown = document.getElementById('header-user-dropdown');
|
||||||
|
|||||||
In neuem Issue referenzieren
Einen Benutzer sperren