Commits vergleichen

...

5 Commits

Autor SHA1 Nachricht Datum
Claude Dev
6b4af4cf2a fix: justify-content: center überall wiederhergestellt + Quellen-Duplikatprüfung
- CSS: 24x fälschliches flex-start zurück auf center (Login, Buttons, Modals, Badges, Map etc.)
- Sources: Domain-Duplikatprüfung bei manuellem Hinzufügen (web_source 1x pro Domain, Domain aus URL extrahieren)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 00:13:36 +01:00
Claude Dev
17088e588f fix: Credits-Dropdown linksbündig, Balken-Track sichtbar, Prozentzahl rechts, kein Fettdruck, mehr Abstand 2026-03-18 00:08:20 +01:00
Claude Dev
97997724de fix: Credits-Anzeige linksbündig, Balken-Hintergrund sichtbar 2026-03-18 00:03:18 +01:00
Claude Dev
acb3c6a6cb feat: Netzwerkanalyse Qualitätsverbesserung — 3 neue Cleanup-Stufen
Phase 1: Entity-Map Key nur noch name_normalized (statt name+type), Typ-Priorität bei Konflikten
Phase 2a (neu): Code-basierte Dedup nach name_normalized, merged Typ-Duplikate
Phase 2c (neu): Semantische Dedup via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten
Phase 2d (neu): Cleanup — Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities
Gemeinsamer _merge_entity_in_db Helper für konsistente Entity-Zusammenführung
Phase 2b (Opus-Korrekturpass) entfernt, ersetzt durch präzisere Phase 2c

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 23:53:28 +01:00
Claude Dev
7bfa1d29cf feat: Credits-System mit Verbrauchsanzeige im User-Dropdown
- DB-Migration: credits_total/credits_used/cost_per_credit auf licenses, token_usage_monthly Tabelle
- Orchestrator: Monatliche Token-Aggregation + Credits-Abzug nach Refresh
- Auth: Credits-Daten im /me Endpoint + Bugfix fehlende Klammer in get()
- Frontend: Credits-Balken im User-Dropdown mit Farbwechsel

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 23:53:19 +01:00
10 geänderte Dateien mit 832 neuen und 174 gelöschten Zeilen

234
regenerate_relations.py Normale Datei
Datei anzeigen

@@ -0,0 +1,234 @@
"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse.
Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus.
"""
import asyncio
import json
import sys
import os
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from database import get_db
from agents.entity_extractor import (
_phase2a_deduplicate_entities,
_phase2_analyze_relationships,
_phase2c_semantic_dedup,
_phase2d_cleanup,
_build_entity_name_map,
_compute_data_hash,
_broadcast,
logger,
)
from agents.claude_client import UsageAccumulator
from config import TIMEZONE
from datetime import datetime
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
async def regenerate_relations_only(analysis_id: int):
"""Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus."""
db = await get_db()
usage_acc = UsageAccumulator()
try:
# Analyse prüfen
cursor = await db.execute(
"SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?",
(analysis_id,),
)
analysis = await cursor.fetchone()
if not analysis:
print(f"Analyse {analysis_id} nicht gefunden!")
return
tenant_id = analysis["tenant_id"]
print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})")
print(f"Vorhandene Entitäten: {analysis['entity_count']}")
# Status auf generating setzen
await db.execute(
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
(analysis_id,),
)
await db.commit()
# Entitäten aus DB laden (mit db_id!)
cursor = await db.execute(
"""SELECT id, name, name_normalized, entity_type, description, aliases, mention_count
FROM network_entities WHERE network_analysis_id = ?""",
(analysis_id,),
)
entity_rows = await cursor.fetchall()
entities = []
for r in entity_rows:
aliases = []
try:
aliases = json.loads(r["aliases"]) if r["aliases"] else []
except (json.JSONDecodeError, TypeError):
pass
entities.append({
"name": r["name"],
"name_normalized": r["name_normalized"],
"type": r["entity_type"],
"description": r["description"] or "",
"aliases": aliases,
"mention_count": r["mention_count"] or 1,
"db_id": r["id"],
})
print(f"Geladene Entitäten: {len(entities)}")
# Phase 2a: Entity-Deduplication (vor Relation-Löschung)
print(f"\n--- Phase 2a: Entity-Deduplication ---\n")
await _phase2a_deduplicate_entities(db, analysis_id, entities)
print(f"Entitäten nach Dedup: {len(entities)}")
# Alte Relations löschen
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
old_count = (await cursor.fetchone())["cnt"]
print(f"\nLösche {old_count} alte Relations...")
await db.execute(
"DELETE FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
await db.commit()
# Incident-IDs laden
cursor = await db.execute(
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
(analysis_id,),
)
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
print(f"Verknüpfte Lagen: {len(incident_ids)}")
# Artikel laden
placeholders = ",".join("?" * len(incident_ids))
cursor = await db.execute(
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
content_original, content_de, collected_at
FROM articles WHERE incident_id IN ({placeholders})""",
incident_ids,
)
article_rows = await cursor.fetchall()
articles = []
article_ids = []
article_ts = []
for r in article_rows:
articles.append({
"id": r["id"], "incident_id": r["incident_id"],
"headline": r["headline"], "headline_de": r["headline_de"],
"source": r["source"], "source_url": r["source_url"],
"content_original": r["content_original"], "content_de": r["content_de"],
})
article_ids.append(r["id"])
article_ts.append(r["collected_at"] or "")
# Faktenchecks laden
cursor = await db.execute(
f"""SELECT id, incident_id, claim, status, evidence, checked_at
FROM fact_checks WHERE incident_id IN ({placeholders})""",
incident_ids,
)
fc_rows = await cursor.fetchall()
factchecks = []
factcheck_ids = []
factcheck_ts = []
for r in fc_rows:
factchecks.append({
"id": r["id"], "incident_id": r["incident_id"],
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
})
factcheck_ids.append(r["id"])
factcheck_ts.append(r["checked_at"] or "")
print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}")
# Phase 2: Beziehungsextraktion
print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n")
relations = await _phase2_analyze_relationships(
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc,
)
# Phase 2c: Semantische Deduplication
print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n")
await _phase2c_semantic_dedup(
db, analysis_id, tenant_id, entities, usage_acc,
)
# Phase 2d: Cleanup
print(f"\n--- Phase 2d: Cleanup ---\n")
await _phase2d_cleanup(db, analysis_id, entities)
# Finale Zähler aus DB
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_entity_count = row["cnt"] if row else len(entities)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_relation_count = row["cnt"] if row else len(relations)
# Finalisierung
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
await db.execute(
"""UPDATE network_analyses
SET entity_count = ?, relation_count = ?, status = 'ready',
last_generated_at = ?, data_hash = ?
WHERE id = ?""",
(final_entity_count, final_relation_count, now, data_hash, analysis_id),
)
await db.execute(
"""INSERT INTO network_generation_log
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
entity_count, relation_count, tenant_id)
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
usage_acc.total_cost_usd, usage_acc.call_count,
final_entity_count, final_relation_count, tenant_id),
)
await db.commit()
print(f"\n{'='*60}")
print(f"FERTIG!")
print(f"Entitäten: {final_entity_count}")
print(f"Beziehungen: {final_relation_count}")
print(f"API-Calls: {usage_acc.call_count}")
print(f"Kosten: ${usage_acc.total_cost_usd:.4f}")
print(f"{'='*60}")
except Exception as e:
print(f"FEHLER: {e}")
import traceback
traceback.print_exc()
try:
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
await db.commit()
except Exception:
pass
finally:
await db.close()
if __name__ == "__main__":
analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1
asyncio.run(regenerate_relations_only(analysis_id))

Datei anzeigen

@@ -4,6 +4,7 @@ import hashlib
import json
import logging
import re
from collections import defaultdict
from datetime import datetime
from typing import Optional
@@ -12,6 +13,18 @@ from config import CLAUDE_MODEL_FAST, TIMEZONE
logger = logging.getLogger("osint.entity_extractor")
# ---------------------------------------------------------------------------
# Konstanten
# ---------------------------------------------------------------------------
TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1}
_STOP_WORDS = frozenset({
"the", "of", "and", "for", "in", "on", "at", "to", "by",
"von", "der", "die", "das", "und", "für", "des", "den", "dem",
"ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach",
})
# ---------------------------------------------------------------------------
# Prompts
# ---------------------------------------------------------------------------
@@ -97,26 +110,33 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
}}"""
CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler.
SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate.
ENTITÄTEN:
{entities_json}
{entity_list}
AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück:
1. "name_fix": Name ist falsch geschrieben oder unvollständig
2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location)
3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken)
AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt?
WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur.
Typische Duplikate:
- Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps"
- Sprachvarianten: "European Union" = "Europäische Union"
- Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus"
REGELN:
- NUR echte Duplikate zusammenführen (gleiche reale Entität)
- NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen
- "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name)
- "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
{{
"corrections": [
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
"merges": [
{{"keep": 1, "merge": [3, 5]}},
{{"keep": 2, "merge": [4]}}
]
}}"""
}}
Falls KEINE Duplikate: {{"merges": []}}"""
# ---------------------------------------------------------------------------
@@ -174,6 +194,74 @@ def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) ->
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
# ---------------------------------------------------------------------------
# Entity-Merge Helper
# ---------------------------------------------------------------------------
async def _merge_entity_in_db(
db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict],
):
"""Führt merge_ent in keep_ent zusammen (DB + In-Memory)."""
keep_id = keep_ent["db_id"]
merge_id = merge_ent["db_id"]
# Aliases vereinen
aliases = set(keep_ent.get("aliases", []))
aliases.add(merge_ent["name"])
for a in merge_ent.get("aliases", []):
if a and a.strip():
aliases.add(a.strip())
aliases.discard(keep_ent["name"])
keep_ent["aliases"] = list(aliases)
# Mention count addieren
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
# Längere Description behalten
if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")):
keep_ent["description"] = merge_ent["description"]
# Entity-Mentions umhängen
await db.execute(
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
(keep_id, merge_id),
)
# Relations umhängen
await db.execute(
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
(keep_id, merge_id, analysis_id),
)
await db.execute(
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
(keep_id, merge_id, analysis_id),
)
# Self-Loops entfernen die durch den Merge entstanden sein könnten
await db.execute(
"DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?",
(keep_id, keep_id, analysis_id),
)
# Keep-Entity in DB aktualisieren
await db.execute(
"""UPDATE network_entities
SET aliases = ?, mention_count = ?, description = ?
WHERE id = ?""",
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
keep_ent["mention_count"], keep_ent.get("description", ""), keep_id),
)
# Merge-Entity löschen
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,))
# Aus entities-Liste entfernen
try:
entities.remove(merge_ent)
except ValueError:
pass
# ---------------------------------------------------------------------------
# Phase 1: Entity-Extraktion (Haiku)
# ---------------------------------------------------------------------------
@@ -209,7 +297,7 @@ async def _phase1_extract_entities(
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
entity_map: dict[tuple[str, str], dict] = {}
entity_map: dict[str, dict] = {}
for batch_idx, batch in enumerate(batches):
articles_text = "\n\n---\n\n".join(batch)
@@ -244,7 +332,7 @@ async def _phase1_extract_entities(
if entity_type not in valid_types:
entity_type = "organisation"
key = (name_normalized, entity_type)
key = name_normalized
if key in entity_map:
existing = entity_map[key]
@@ -259,6 +347,9 @@ async def _phase1_extract_entities(
new_desc = ent.get("description", "")
if len(new_desc) > len(existing.get("description", "")):
existing["description"] = new_desc
# Typ-Priorität: höherwertigen Typ behalten
if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0):
existing["type"] = entity_type
else:
entity_map[key] = {
"name": name,
@@ -303,6 +394,55 @@ async def _phase1_extract_entities(
return all_entities
# ---------------------------------------------------------------------------
# Phase 2a: Entity-Deduplication nach name_normalized
# ---------------------------------------------------------------------------
async def _phase2a_deduplicate_entities(
db, analysis_id: int, entities: list[dict], ws_manager=None,
) -> None:
"""Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ)."""
logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate")
groups = defaultdict(list)
for ent in entities:
if ent.get("db_id"):
groups[ent["name_normalized"]].append(ent)
merge_count = 0
merged_ids = set()
for nn, group in groups.items():
if len(group) < 2:
continue
# Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen
group.sort(
key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)),
reverse=True,
)
keep = group[0]
for merge in group[1:]:
if merge["db_id"] in merged_ids:
continue
await _merge_entity_in_db(db, analysis_id, keep, merge, entities)
merged_ids.add(merge["db_id"])
merge_count += 1
if merge_count > 0:
await db.commit()
logger.info(
f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, "
f"{len(entities)} Entitäten verbleiben"
)
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "entity_dedup",
"progress": 100,
})
# ---------------------------------------------------------------------------
# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
# ---------------------------------------------------------------------------
@@ -566,186 +706,232 @@ async def _phase2_analyze_relationships(
# ---------------------------------------------------------------------------
# Phase 2b: Korrekturen (Opus)
# Phase 2c: Semantische Deduplication (Opus)
# ---------------------------------------------------------------------------
async def _phase2b_corrections(
async def _phase2c_semantic_dedup(
db, analysis_id: int, tenant_id: int,
entities: list[dict], relation_count: int,
usage_acc: UsageAccumulator, ws_manager=None,
entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None,
) -> None:
"""Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add)."""
"""Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten."""
if len(entities) < 10:
return
logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten")
logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten")
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "correction",
"phase": "semantic_dedup",
"progress": 0,
})
entities_for_prompt = []
for e in entities:
entities_for_prompt.append({
"name": e["name"],
"type": e["type"],
"description": e.get("description", ""),
"aliases": e.get("aliases", []),
})
# --- Clustering: Token-basiert + Abbreviation-Matching ---
token_to_ids = defaultdict(set)
db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")}
batch_size = 500
entity_batches = [entities_for_prompt[i:i + batch_size]
for i in range(0, len(entities_for_prompt), batch_size)]
for ent in entities:
db_id = ent.get("db_id")
if not db_id:
continue
all_corrections = []
for bi, eb in enumerate(entity_batches):
entities_json = json.dumps(eb, ensure_ascii=False, indent=1)
prompt = CORRECTION_PROMPT.format(entities_json=entities_json)
all_names = [ent["name_normalized"]]
for a in ent.get("aliases", []):
if a:
all_names.append(a.lower())
# Token-basiertes Clustering
for name in all_names:
for word in re.split(r'[\s\-_/(),.;:]+', name):
word = word.strip()
if len(word) >= 3 and word not in _STOP_WORDS:
token_to_ids[word].add(db_id)
# Abbreviation-Matching
all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a]
for name in all_display_names:
words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2]
if len(words) >= 2:
abbr = "".join(w[0] for w in words).lower()
if len(abbr) >= 2:
token_to_ids[f"_abbr_{abbr}"].add(db_id)
# Kurze Namen als potenzielle Abkürzungen
name_clean = re.sub(r'[^a-zA-Z]', '', name).lower()
if 2 <= len(name_clean) <= 6:
token_to_ids[f"_abbr_{name_clean}"].add(db_id)
# Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities)
seen_clusters = set()
candidate_clusters = []
for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])):
if len(db_ids) < 2 or len(db_ids) > 40:
continue
key = frozenset(db_ids)
if key in seen_clusters:
continue
seen_clusters.add(key)
candidate_clusters.append(list(db_ids))
logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden")
# --- Opus-Calls für jeden Cluster ---
merged_away = set()
total_merges = 0
opus_calls = 0
max_calls = 50
for ci, cluster_ids in enumerate(candidate_clusters):
if opus_calls >= max_calls:
logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe")
break
# Bereits gemergte Entities filtern
active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map]
if len(active_ids) < 2:
continue
active_ents = [db_id_map[did] for did in active_ids]
# Prompt bauen
lines = []
for i, ent in enumerate(active_ents, 1):
aliases_str = ", ".join((ent.get("aliases") or [])[:5])
line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)"
if aliases_str:
line += f" [Aliases: {aliases_str}]"
lines.append(line)
prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines))
try:
result_text, usage = await call_claude(prompt, tools=None, model=None)
usage_acc.add(usage)
opus_calls += 1
except Exception as e:
logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}")
logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}")
continue
parsed = _parse_json_response(result_text)
if not parsed:
continue
corrections = parsed.get("corrections", [])
if isinstance(corrections, list):
all_corrections.extend(corrections)
for merge_group in parsed.get("merges", []):
keep_idx = merge_group.get("keep")
merge_indices = merge_group.get("merge", [])
if keep_idx is None or not merge_indices:
continue
if not isinstance(merge_indices, list):
merge_indices = [merge_indices]
logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen")
keep_idx -= 1 # 1-indexed → 0-indexed
if keep_idx < 0 or keep_idx >= len(active_ents):
continue
keep_ent = active_ents[keep_idx]
if keep_ent["db_id"] in merged_away:
continue
if all_corrections:
await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections)
logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet")
for mi in merge_indices:
mi -= 1
if mi < 0 or mi >= len(active_ents) or mi == keep_idx:
continue
merge_ent = active_ents[mi]
if merge_ent["db_id"] in merged_away:
continue
logger.info(f"Semantic Merge: '{merge_ent['name']}''{keep_ent['name']}'")
await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities)
merged_away.add(merge_ent["db_id"])
db_id_map.pop(merge_ent["db_id"], None)
total_merges += 1
# Progress
progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "semantic_dedup",
"progress": min(progress, 100),
})
if total_merges > 0:
await db.commit()
logger.info(
f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, "
f"{len(entities)} Entitäten verbleiben"
)
# ---------------------------------------------------------------------------
# Phase 2d: Cleanup
# ---------------------------------------------------------------------------
async def _phase2d_cleanup(
db, analysis_id: int, entities: list[dict], ws_manager=None,
) -> None:
"""Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities."""
logger.info("Phase 2d: Cleanup")
# 1. Self-Loops entfernen
cursor = await db.execute(
"DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id",
(analysis_id,),
)
self_loops = cursor.rowcount
# 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein)
await db.execute(
"""UPDATE network_relations
SET source_entity_id = target_entity_id, target_entity_id = source_entity_id
WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""",
(analysis_id,),
)
# 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung)
cursor = await db.execute(
"""DELETE FROM network_relations
WHERE network_analysis_id = ? AND id NOT IN (
SELECT MIN(id) FROM network_relations
WHERE network_analysis_id = ?
GROUP BY source_entity_id, target_entity_id, category
)""",
(analysis_id, analysis_id),
)
dup_relations = cursor.rowcount
# 4. Verwaiste Entities entfernen (keine Verbindungen)
cursor = await db.execute(
"""DELETE FROM network_entities
WHERE network_analysis_id = ? AND id NOT IN (
SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ?
UNION
SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ?
)""",
(analysis_id, analysis_id, analysis_id),
)
orphans = cursor.rowcount
# Entities-Liste aktualisieren
remaining_ids = set()
cursor = await db.execute(
"SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,)
)
for row in await cursor.fetchall():
remaining_ids.add(row["id"])
entities[:] = [e for e in entities if e.get("db_id") in remaining_ids]
await db.commit()
logger.info(
f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, "
f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben"
)
await _broadcast(ws_manager, "network_status", {
"analysis_id": analysis_id,
"phase": "correction",
"phase": "cleanup",
"progress": 100,
})
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
"""Wendet Opus-Korrekturen auf Entitäten an."""
for corr in corrections:
if not isinstance(corr, dict):
continue
corr_type = corr.get("type", "")
try:
if corr_type == "name_fix":
entity_name = (corr.get("entity_name") or "").strip()
corrected_name = (corr.get("corrected_name") or "").strip()
if not entity_name or not corrected_name:
continue
for ent in entities:
if ent["name"].lower() == entity_name.lower() or \
ent["name_normalized"] == entity_name.lower():
old_name = ent["name"]
ent["name"] = corrected_name
ent["name_normalized"] = corrected_name.lower().strip()
ent.setdefault("aliases", [])
if old_name not in ent["aliases"]:
ent["aliases"].append(old_name)
if ent.get("db_id"):
await db.execute(
"""UPDATE network_entities
SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1
WHERE id = ?""",
(corrected_name, corrected_name.lower().strip(),
json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]),
)
break
elif corr_type == "merge":
keep_name = (corr.get("entity_name") or "").strip()
merge_name = (corr.get("merge_with") or "").strip()
if not keep_name or not merge_name:
continue
keep_ent = merge_ent = None
for ent in entities:
nl = ent["name"].lower()
nn = ent["name_normalized"]
if nl == keep_name.lower() or nn == keep_name.lower():
keep_ent = ent
elif nl == merge_name.lower() or nn == merge_name.lower():
merge_ent = ent
if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"):
aliases = set(keep_ent.get("aliases", []))
aliases.add(merge_ent["name"])
for a in merge_ent.get("aliases", []):
if a and a.strip():
aliases.add(a.strip())
keep_ent["aliases"] = list(aliases)
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
await db.execute(
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
(keep_ent["db_id"], merge_ent["db_id"]),
)
await db.execute(
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute(
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
)
await db.execute(
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
WHERE id = ?""",
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
keep_ent["mention_count"], keep_ent["db_id"]),
)
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],))
entities.remove(merge_ent)
elif corr_type == "add":
entity_name = (corr.get("entity_name") or "").strip()
entity_type = (corr.get("entity_type") or "organisation").lower().strip()
description = corr.get("description", "")
if not entity_name:
continue
valid_types = {"person", "organisation", "location", "event", "military"}
if entity_type not in valid_types:
entity_type = "organisation"
name_norm = entity_name.lower().strip()
if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities):
continue
cursor = await db.execute(
"""INSERT OR IGNORE INTO network_entities
(network_analysis_id, name, name_normalized, entity_type,
description, aliases, mention_count, corrected_by_opus, tenant_id)
VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""",
(analysis_id, entity_name, name_norm, entity_type, description, tenant_id),
)
if cursor.lastrowid:
entities.append({
"name": entity_name, "name_normalized": name_norm,
"type": entity_type, "description": description,
"aliases": [], "mention_count": 1, "db_id": cursor.lastrowid,
})
except Exception as e:
logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}")
await db.commit()
# ---------------------------------------------------------------------------
# Phase 3: Finalisierung
# ---------------------------------------------------------------------------
@@ -799,10 +985,12 @@ async def _phase3_finalize(
async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None):
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add)
Phase 3: Finalisierung (Zähler, Hash, Log)
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
Phase 2a: Entity-Dedup nach name_normalized (Code, kein API)
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
Phase 2c: Semantische Dedup via Opus (Cluster-weise)
Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations)
Phase 3: Finalisierung (Zähler, Hash, Log)
"""
from database import get_db
@@ -879,7 +1067,7 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
# Phase 1
# Phase 1: Entity-Extraktion
if not await _check_analysis_exists(db, analysis_id):
return
@@ -887,7 +1075,13 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
)
# Phase 2
# Phase 2a: Entity-Deduplication
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager)
# Phase 2: Beziehungsextraktion
if not await _check_analysis_exists(db, analysis_id):
return
@@ -895,15 +1089,21 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
)
# Phase 2b: Korrekturen
# Phase 2c: Semantische Deduplication
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2b_corrections(
db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager,
await _phase2c_semantic_dedup(
db, analysis_id, tenant_id, entities, usage_acc, ws_manager,
)
# Entity-Count nach Korrekturen aktualisieren
# Phase 2d: Cleanup
if not await _check_analysis_exists(db, analysis_id):
return
await _phase2d_cleanup(db, analysis_id, entities, ws_manager)
# Finale Zähler aus DB (nach allen Cleanups)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
(analysis_id,),
@@ -911,13 +1111,20 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
row = await cursor.fetchone()
final_entity_count = row["cnt"] if row else len(entities)
# Phase 3
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
(analysis_id,),
)
row = await cursor.fetchone()
final_relation_count = row["cnt"] if row else len(relations)
# Phase 3: Finalisierung
if not await _check_analysis_exists(db, analysis_id):
return
await _phase3_finalize(
db, analysis_id, tenant_id,
entity_count=final_entity_count, relation_count=len(relations),
entity_count=final_entity_count, relation_count=final_relation_count,
article_ids=article_ids, factcheck_ids=factcheck_ids,
article_ts=article_ts, factcheck_ts=factcheck_ts,
usage_acc=usage_acc, ws_manager=ws_manager,

Datei anzeigen

@@ -1316,6 +1316,41 @@ class AgentOrchestrator:
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
)
# Credits-Tracking: Monatliche Aggregation + Credits abziehen
if tenant_id and usage_acc.total_cost_usd > 0:
year_month = datetime.now(TIMEZONE).strftime('%Y-%m')
await db.execute("""
INSERT INTO token_usage_monthly
(organization_id, year_month, input_tokens, output_tokens,
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, refresh_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
ON CONFLICT(organization_id, year_month) DO UPDATE SET
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cache_creation_tokens = cache_creation_tokens + excluded.cache_creation_tokens,
cache_read_tokens = cache_read_tokens + excluded.cache_read_tokens,
total_cost_usd = total_cost_usd + excluded.total_cost_usd,
api_calls = api_calls + excluded.api_calls,
refresh_count = refresh_count + 1,
updated_at = CURRENT_TIMESTAMP
""", (tenant_id, year_month,
usage_acc.input_tokens, usage_acc.output_tokens,
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
round(usage_acc.total_cost_usd, 7), usage_acc.call_count))
# Credits auf Lizenz abziehen
lic_cursor = await db.execute(
"SELECT cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
(tenant_id,))
lic = await lic_cursor.fetchone()
if lic and lic["cost_per_credit"] and lic["cost_per_credit"] > 0:
credits_consumed = usage_acc.total_cost_usd / lic["cost_per_credit"]
await db.execute(
"UPDATE licenses SET credits_used = COALESCE(credits_used, 0) + ? WHERE organization_id = ? AND status = 'active'",
(round(credits_consumed, 2), tenant_id))
await db.commit()
logger.info(f"Credits: {round(credits_consumed, 1) if lic and lic['cost_per_credit'] else 0} abgezogen für Tenant {tenant_id}")
# Quellen-Discovery im Background starten
if unique_results:
asyncio.create_task(_background_discover_sources(unique_results))

Datei anzeigen

@@ -582,7 +582,42 @@ async def init_db():
await db.commit()
logger.info("Migration: article_locations-Tabelle erstellt")
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
# Migration: Credits-System fuer Lizenzen
cursor = await db.execute("PRAGMA table_info(licenses)")
columns = [row[1] for row in await cursor.fetchall()]
if "token_budget_usd" not in columns:
await db.execute("ALTER TABLE licenses ADD COLUMN token_budget_usd REAL")
await db.execute("ALTER TABLE licenses ADD COLUMN credits_total INTEGER")
await db.execute("ALTER TABLE licenses ADD COLUMN credits_used REAL DEFAULT 0")
await db.execute("ALTER TABLE licenses ADD COLUMN cost_per_credit REAL")
await db.execute("ALTER TABLE licenses ADD COLUMN budget_warning_percent INTEGER DEFAULT 80")
await db.commit()
logger.info("Migration: Credits-System zu Lizenzen hinzugefuegt")
# Migration: Token-Usage-Monatstabelle
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='token_usage_monthly'")
if not await cursor.fetchone():
await db.execute("""
CREATE TABLE token_usage_monthly (
id INTEGER PRIMARY KEY AUTOINCREMENT,
organization_id INTEGER REFERENCES organizations(id),
year_month TEXT NOT NULL,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_creation_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
total_cost_usd REAL DEFAULT 0.0,
api_calls INTEGER DEFAULT 0,
refresh_count INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(organization_id, year_month)
)
""")
await db.commit()
logger.info("Migration: token_usage_monthly Tabelle erstellt")
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
await db.execute(
"""UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart',
completed_at = CURRENT_TIMESTAMP

Datei anzeigen

@@ -41,6 +41,9 @@ class UserMeResponse(BaseModel):
license_status: str = "unknown"
license_type: str = ""
read_only: bool = False
credits_total: Optional[int] = None
credits_remaining: Optional[int] = None
credits_percent_used: Optional[float] = None
# Incidents (Lagen)

Datei anzeigen

@@ -261,10 +261,28 @@ async def get_me(
from services.license_service import check_license
license_info = await check_license(db, current_user["tenant_id"])
# Credits-Daten laden
credits_total = None
credits_remaining = None
credits_percent_used = None
if current_user.get("tenant_id"):
lic_cursor = await db.execute(
"SELECT credits_total, credits_used, cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
(current_user["tenant_id"],))
lic_row = await lic_cursor.fetchone()
if lic_row and lic_row["credits_total"]:
credits_total = lic_row["credits_total"]
credits_used = lic_row["credits_used"] or 0
credits_remaining = max(0, int(credits_total - credits_used))
credits_percent_used = round(min(100, (credits_used / credits_total) * 100), 1) if credits_total > 0 else 0
return UserMeResponse(
id=current_user["id"],
username=current_user["username"],
email=current_user.get("email", ""),
credits_total=credits_total,
credits_remaining=credits_remaining,
credits_percent_used=credits_percent_used,
role=current_user["role"],
org_name=org_name,
org_slug=current_user.get("org_slug", ""),

Datei anzeigen

@@ -415,12 +415,14 @@ async def create_source(
"""Neue Quelle hinzufuegen (org-spezifisch)."""
tenant_id = current_user.get("tenant_id")
# Domain normalisieren (Subdomain-Aliase auflösen)
# Domain normalisieren (Subdomain-Aliase auflösen, aus URL extrahieren)
domain = data.domain
if not domain and data.url:
domain = _extract_domain(data.url)
if domain:
domain = _DOMAIN_ALIASES.get(domain.lower(), domain.lower())
# Duplikat-Prüfung: gleiche URL bereits vorhanden?
# Duplikat-Prüfung 1: gleiche URL bereits vorhanden? (tenant-übergreifend)
if data.url:
cursor = await db.execute(
"SELECT id, name FROM sources WHERE url = ? AND status = 'active'",
@@ -433,6 +435,25 @@ async def create_source(
detail=f"Feed-URL bereits vorhanden: {existing['name']} (ID {existing['id']})",
)
# Duplikat-Prüfung 2: Domain bereits vorhanden? (tenant-übergreifend)
if domain:
cursor = await db.execute(
"SELECT id, name, source_type FROM sources WHERE LOWER(domain) = ? AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) LIMIT 1",
(domain.lower(), tenant_id),
)
domain_existing = await cursor.fetchone()
if domain_existing:
if data.source_type == "web_source":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Web-Quelle für '{domain}' bereits vorhanden: {domain_existing['name']}",
)
if not data.url:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.",
)
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",

Datei anzeigen

@@ -5335,3 +5335,69 @@ body.tutorial-active .tutorial-cursor {
color: var(--text-primary);
box-shadow: 0 0 0 2px rgba(150, 121, 26, 0.25);
}
/* ===== Credits-Anzeige im User-Dropdown ===== */
.credits-section {
padding: 0;
text-align: left;
}
.credits-divider {
height: 1px;
background: var(--border);
margin: 8px 0;
}
.credits-label {
font-size: 11px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.5px;
color: var(--text-tertiary);
margin-bottom: 8px;
text-align: left;
}
.credits-bar-container {
width: 100%;
height: 8px;
background: rgba(255,255,255,0.08);
border: 1px solid rgba(255,255,255,0.12);
border-radius: 4px;
overflow: hidden;
margin-bottom: 10px;
}
.credits-bar {
height: 100%;
border-radius: 4px;
background: var(--accent);
transition: width 0.6s ease, background-color 0.3s ease;
min-width: 2px;
}
.credits-bar.warning {
background: #e67e22;
}
.credits-bar.critical {
background: #e74c3c;
}
.credits-info {
font-size: 12px;
color: var(--text-tertiary);
display: flex;
justify-content: space-between;
align-items: center;
}
.credits-info span {
font-weight: 400;
color: var(--text-secondary);
}
.credits-percent {
font-size: 11px;
color: var(--text-tertiary);
}

Datei anzeigen

@@ -50,6 +50,17 @@
<span class="header-dropdown-label">Lizenz</span>
<span class="header-dropdown-value" id="header-license-info">-</span>
</div>
<div id="credits-section" class="credits-section" style="display: none;">
<div class="credits-divider"></div>
<div class="credits-label">Credits</div>
<div class="credits-bar-container">
<div id="credits-bar" class="credits-bar"></div>
</div>
<div class="credits-info">
<span><span id="credits-remaining">0</span> von <span id="credits-total">0</span></span>
<span class="credits-percent" id="credits-percent"></span>
</div>
</div>
</div>
</div>
<div class="header-license-warning" id="header-license-warning"></div>

Datei anzeigen

@@ -466,6 +466,34 @@ const App = {
licInfoEl.textContent = label;
}
// Credits-Anzeige im Dropdown
const creditsSection = document.getElementById('credits-section');
if (creditsSection && user.credits_total) {
creditsSection.style.display = 'block';
const bar = document.getElementById('credits-bar');
const remainingEl = document.getElementById('credits-remaining');
const totalEl = document.getElementById('credits-total');
const remaining = user.credits_remaining || 0;
const total = user.credits_total || 1;
const percentUsed = user.credits_percent_used || 0;
const percentRemaining = Math.max(0, 100 - percentUsed);
remainingEl.textContent = remaining.toLocaleString('de-DE');
totalEl.textContent = total.toLocaleString('de-DE');
bar.style.width = percentRemaining + '%';
// Farbwechsel je nach Verbrauch
bar.classList.remove('warning', 'critical');
if (percentUsed > 80) {
bar.classList.add('critical');
} else if (percentUsed > 50) {
bar.classList.add('warning');
}
const percentEl = document.getElementById("credits-percent");
if (percentEl) percentEl.textContent = percentRemaining.toFixed(0) + "% verbleibend";
}
// Dropdown Toggle
const userBtn = document.getElementById('header-user-btn');
const userDropdown = document.getElementById('header-user-dropdown');