Commits vergleichen
5 Commits
4d6d022bee
...
main
| Autor | SHA1 | Datum | |
|---|---|---|---|
|
|
6b4af4cf2a | ||
|
|
17088e588f | ||
|
|
97997724de | ||
|
|
acb3c6a6cb | ||
|
|
7bfa1d29cf |
234
regenerate_relations.py
Normale Datei
234
regenerate_relations.py
Normale Datei
@@ -0,0 +1,234 @@
|
||||
"""Regeneriert NUR die Beziehungen für eine bestehende Netzwerkanalyse.
|
||||
Nutzt die vorhandenen Entitäten und führt Phase 2a + Phase 2 + Phase 2c + Phase 2d aus.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
|
||||
|
||||
from database import get_db
|
||||
from agents.entity_extractor import (
|
||||
_phase2a_deduplicate_entities,
|
||||
_phase2_analyze_relationships,
|
||||
_phase2c_semantic_dedup,
|
||||
_phase2d_cleanup,
|
||||
_build_entity_name_map,
|
||||
_compute_data_hash,
|
||||
_broadcast,
|
||||
logger,
|
||||
)
|
||||
from agents.claude_client import UsageAccumulator
|
||||
from config import TIMEZONE
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||||
)
|
||||
|
||||
|
||||
async def regenerate_relations_only(analysis_id: int):
|
||||
"""Löscht alte Relations und führt Phase 2a + 2 + 2c + 2d neu aus."""
|
||||
db = await get_db()
|
||||
usage_acc = UsageAccumulator()
|
||||
|
||||
try:
|
||||
# Analyse prüfen
|
||||
cursor = await db.execute(
|
||||
"SELECT id, name, tenant_id, entity_count FROM network_analyses WHERE id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
analysis = await cursor.fetchone()
|
||||
if not analysis:
|
||||
print(f"Analyse {analysis_id} nicht gefunden!")
|
||||
return
|
||||
|
||||
tenant_id = analysis["tenant_id"]
|
||||
print(f"\nAnalyse: {analysis['name']} (ID={analysis_id})")
|
||||
print(f"Vorhandene Entitäten: {analysis['entity_count']}")
|
||||
|
||||
# Status auf generating setzen
|
||||
await db.execute(
|
||||
"UPDATE network_analyses SET status = 'generating' WHERE id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
# Entitäten aus DB laden (mit db_id!)
|
||||
cursor = await db.execute(
|
||||
"""SELECT id, name, name_normalized, entity_type, description, aliases, mention_count
|
||||
FROM network_entities WHERE network_analysis_id = ?""",
|
||||
(analysis_id,),
|
||||
)
|
||||
entity_rows = await cursor.fetchall()
|
||||
entities = []
|
||||
for r in entity_rows:
|
||||
aliases = []
|
||||
try:
|
||||
aliases = json.loads(r["aliases"]) if r["aliases"] else []
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
entities.append({
|
||||
"name": r["name"],
|
||||
"name_normalized": r["name_normalized"],
|
||||
"type": r["entity_type"],
|
||||
"description": r["description"] or "",
|
||||
"aliases": aliases,
|
||||
"mention_count": r["mention_count"] or 1,
|
||||
"db_id": r["id"],
|
||||
})
|
||||
|
||||
print(f"Geladene Entitäten: {len(entities)}")
|
||||
|
||||
# Phase 2a: Entity-Deduplication (vor Relation-Löschung)
|
||||
print(f"\n--- Phase 2a: Entity-Deduplication ---\n")
|
||||
await _phase2a_deduplicate_entities(db, analysis_id, entities)
|
||||
print(f"Entitäten nach Dedup: {len(entities)}")
|
||||
|
||||
# Alte Relations löschen
|
||||
cursor = await db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
old_count = (await cursor.fetchone())["cnt"]
|
||||
print(f"\nLösche {old_count} alte Relations...")
|
||||
await db.execute(
|
||||
"DELETE FROM network_relations WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
# Incident-IDs laden
|
||||
cursor = await db.execute(
|
||||
"SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
incident_ids = [row["incident_id"] for row in await cursor.fetchall()]
|
||||
print(f"Verknüpfte Lagen: {len(incident_ids)}")
|
||||
|
||||
# Artikel laden
|
||||
placeholders = ",".join("?" * len(incident_ids))
|
||||
cursor = await db.execute(
|
||||
f"""SELECT id, incident_id, headline, headline_de, source, source_url,
|
||||
content_original, content_de, collected_at
|
||||
FROM articles WHERE incident_id IN ({placeholders})""",
|
||||
incident_ids,
|
||||
)
|
||||
article_rows = await cursor.fetchall()
|
||||
articles = []
|
||||
article_ids = []
|
||||
article_ts = []
|
||||
for r in article_rows:
|
||||
articles.append({
|
||||
"id": r["id"], "incident_id": r["incident_id"],
|
||||
"headline": r["headline"], "headline_de": r["headline_de"],
|
||||
"source": r["source"], "source_url": r["source_url"],
|
||||
"content_original": r["content_original"], "content_de": r["content_de"],
|
||||
})
|
||||
article_ids.append(r["id"])
|
||||
article_ts.append(r["collected_at"] or "")
|
||||
|
||||
# Faktenchecks laden
|
||||
cursor = await db.execute(
|
||||
f"""SELECT id, incident_id, claim, status, evidence, checked_at
|
||||
FROM fact_checks WHERE incident_id IN ({placeholders})""",
|
||||
incident_ids,
|
||||
)
|
||||
fc_rows = await cursor.fetchall()
|
||||
factchecks = []
|
||||
factcheck_ids = []
|
||||
factcheck_ts = []
|
||||
for r in fc_rows:
|
||||
factchecks.append({
|
||||
"id": r["id"], "incident_id": r["incident_id"],
|
||||
"claim": r["claim"], "status": r["status"], "evidence": r["evidence"],
|
||||
})
|
||||
factcheck_ids.append(r["id"])
|
||||
factcheck_ts.append(r["checked_at"] or "")
|
||||
|
||||
print(f"Artikel: {len(articles)}, Faktenchecks: {len(factchecks)}")
|
||||
|
||||
# Phase 2: Beziehungsextraktion
|
||||
print(f"\n--- Phase 2: Batched Beziehungsextraktion starten ---\n")
|
||||
relations = await _phase2_analyze_relationships(
|
||||
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc,
|
||||
)
|
||||
|
||||
# Phase 2c: Semantische Deduplication
|
||||
print(f"\n--- Phase 2c: Semantische Deduplication (Opus) ---\n")
|
||||
await _phase2c_semantic_dedup(
|
||||
db, analysis_id, tenant_id, entities, usage_acc,
|
||||
)
|
||||
|
||||
# Phase 2d: Cleanup
|
||||
print(f"\n--- Phase 2d: Cleanup ---\n")
|
||||
await _phase2d_cleanup(db, analysis_id, entities)
|
||||
|
||||
# Finale Zähler aus DB
|
||||
cursor = await db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
final_entity_count = row["cnt"] if row else len(entities)
|
||||
|
||||
cursor = await db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
final_relation_count = row["cnt"] if row else len(relations)
|
||||
|
||||
# Finalisierung
|
||||
data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts)
|
||||
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
await db.execute(
|
||||
"""UPDATE network_analyses
|
||||
SET entity_count = ?, relation_count = ?, status = 'ready',
|
||||
last_generated_at = ?, data_hash = ?
|
||||
WHERE id = ?""",
|
||||
(final_entity_count, final_relation_count, now, data_hash, analysis_id),
|
||||
)
|
||||
|
||||
await db.execute(
|
||||
"""INSERT INTO network_generation_log
|
||||
(network_analysis_id, completed_at, status, input_tokens, output_tokens,
|
||||
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls,
|
||||
entity_count, relation_count, tenant_id)
|
||||
VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens,
|
||||
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
||||
usage_acc.total_cost_usd, usage_acc.call_count,
|
||||
final_entity_count, final_relation_count, tenant_id),
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"FERTIG!")
|
||||
print(f"Entitäten: {final_entity_count}")
|
||||
print(f"Beziehungen: {final_relation_count}")
|
||||
print(f"API-Calls: {usage_acc.call_count}")
|
||||
print(f"Kosten: ${usage_acc.total_cost_usd:.4f}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"FEHLER: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
try:
|
||||
await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,))
|
||||
await db.commit()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
analysis_id = int(sys.argv[1]) if len(sys.argv) > 1 else 1
|
||||
asyncio.run(regenerate_relations_only(analysis_id))
|
||||
@@ -4,6 +4,7 @@ import hashlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
@@ -12,6 +13,18 @@ from config import CLAUDE_MODEL_FAST, TIMEZONE
|
||||
|
||||
logger = logging.getLogger("osint.entity_extractor")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Konstanten
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
TYPE_PRIORITY = {"location": 5, "organisation": 4, "military": 3, "event": 2, "person": 1}
|
||||
|
||||
_STOP_WORDS = frozenset({
|
||||
"the", "of", "and", "for", "in", "on", "at", "to", "by",
|
||||
"von", "der", "die", "das", "und", "für", "des", "den", "dem",
|
||||
"ein", "eine", "zur", "zum", "bei", "mit", "aus", "nach",
|
||||
})
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prompts
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -97,26 +110,33 @@ AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
||||
}}"""
|
||||
|
||||
|
||||
CORRECTION_PROMPT = """Du bist ein Senior OSINT-Analyst. Prüfe die extrahierten Entitäten auf Fehler.
|
||||
SEMANTIC_DEDUP_PROMPT = """Du bist ein OSINT-Analyst. Prüfe diese Entitäten auf Duplikate.
|
||||
|
||||
ENTITÄTEN:
|
||||
{entities_json}
|
||||
{entity_list}
|
||||
|
||||
AUFGABE: Prüfe auf folgende Fehler und gib NUR Korrekturen zurück:
|
||||
1. "name_fix": Name ist falsch geschrieben oder unvollständig
|
||||
2. "merge": Zwei Entitäten bezeichnen dasselbe -> zusammenführen (z.B. "USA" als Organisation und "USA" als Location)
|
||||
3. "add": Wichtige Entität fehlt komplett (nur bei offensichtlichen Lücken)
|
||||
AUFGABE: Welche Entitäten bezeichnen DASSELBE reale Objekt?
|
||||
|
||||
WICHTIG: Nur echte Fehler korrigieren. Im Zweifel KEINE Korrektur.
|
||||
Typische Duplikate:
|
||||
- Abkürzung vs. Vollname: "IRGC" = "Iranian Revolutionary Guard Corps"
|
||||
- Sprachvarianten: "European Union" = "Europäische Union"
|
||||
- Schreibvarianten: "Strait of Hormuz" = "Straße von Hormus"
|
||||
|
||||
REGELN:
|
||||
- NUR echte Duplikate zusammenführen (gleiche reale Entität)
|
||||
- NICHT zusammenführen: Unterorganisationen (IRGC ≠ IRGC Navy), verschiedene Personen
|
||||
- "keep": Nummer der Haupt-Entität (bevorzuge: mehr Erwähnungen, vollständiger Name)
|
||||
- "merge": Nummern der Duplikate die in die Haupt-Entität zusammengeführt werden
|
||||
|
||||
AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON:
|
||||
{{
|
||||
"corrections": [
|
||||
{{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}},
|
||||
{{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}},
|
||||
{{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}}
|
||||
"merges": [
|
||||
{{"keep": 1, "merge": [3, 5]}},
|
||||
{{"keep": 2, "merge": [4]}}
|
||||
]
|
||||
}}"""
|
||||
}}
|
||||
|
||||
Falls KEINE Duplikate: {{"merges": []}}"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -174,6 +194,74 @@ def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) ->
|
||||
return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entity-Merge Helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _merge_entity_in_db(
|
||||
db, analysis_id: int, keep_ent: dict, merge_ent: dict, entities: list[dict],
|
||||
):
|
||||
"""Führt merge_ent in keep_ent zusammen (DB + In-Memory)."""
|
||||
keep_id = keep_ent["db_id"]
|
||||
merge_id = merge_ent["db_id"]
|
||||
|
||||
# Aliases vereinen
|
||||
aliases = set(keep_ent.get("aliases", []))
|
||||
aliases.add(merge_ent["name"])
|
||||
for a in merge_ent.get("aliases", []):
|
||||
if a and a.strip():
|
||||
aliases.add(a.strip())
|
||||
aliases.discard(keep_ent["name"])
|
||||
keep_ent["aliases"] = list(aliases)
|
||||
|
||||
# Mention count addieren
|
||||
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
|
||||
|
||||
# Längere Description behalten
|
||||
if len(merge_ent.get("description", "")) > len(keep_ent.get("description", "")):
|
||||
keep_ent["description"] = merge_ent["description"]
|
||||
|
||||
# Entity-Mentions umhängen
|
||||
await db.execute(
|
||||
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
||||
(keep_id, merge_id),
|
||||
)
|
||||
|
||||
# Relations umhängen
|
||||
await db.execute(
|
||||
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
|
||||
(keep_id, merge_id, analysis_id),
|
||||
)
|
||||
await db.execute(
|
||||
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
|
||||
(keep_id, merge_id, analysis_id),
|
||||
)
|
||||
|
||||
# Self-Loops entfernen die durch den Merge entstanden sein könnten
|
||||
await db.execute(
|
||||
"DELETE FROM network_relations WHERE source_entity_id = ? AND target_entity_id = ? AND network_analysis_id = ?",
|
||||
(keep_id, keep_id, analysis_id),
|
||||
)
|
||||
|
||||
# Keep-Entity in DB aktualisieren
|
||||
await db.execute(
|
||||
"""UPDATE network_entities
|
||||
SET aliases = ?, mention_count = ?, description = ?
|
||||
WHERE id = ?""",
|
||||
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
|
||||
keep_ent["mention_count"], keep_ent.get("description", ""), keep_id),
|
||||
)
|
||||
|
||||
# Merge-Entity löschen
|
||||
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_id,))
|
||||
|
||||
# Aus entities-Liste entfernen
|
||||
try:
|
||||
entities.remove(merge_ent)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 1: Entity-Extraktion (Haiku)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -209,7 +297,7 @@ async def _phase1_extract_entities(
|
||||
batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)]
|
||||
logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches")
|
||||
|
||||
entity_map: dict[tuple[str, str], dict] = {}
|
||||
entity_map: dict[str, dict] = {}
|
||||
|
||||
for batch_idx, batch in enumerate(batches):
|
||||
articles_text = "\n\n---\n\n".join(batch)
|
||||
@@ -244,7 +332,7 @@ async def _phase1_extract_entities(
|
||||
if entity_type not in valid_types:
|
||||
entity_type = "organisation"
|
||||
|
||||
key = (name_normalized, entity_type)
|
||||
key = name_normalized
|
||||
|
||||
if key in entity_map:
|
||||
existing = entity_map[key]
|
||||
@@ -259,6 +347,9 @@ async def _phase1_extract_entities(
|
||||
new_desc = ent.get("description", "")
|
||||
if len(new_desc) > len(existing.get("description", "")):
|
||||
existing["description"] = new_desc
|
||||
# Typ-Priorität: höherwertigen Typ behalten
|
||||
if TYPE_PRIORITY.get(entity_type, 0) > TYPE_PRIORITY.get(existing["type"], 0):
|
||||
existing["type"] = entity_type
|
||||
else:
|
||||
entity_map[key] = {
|
||||
"name": name,
|
||||
@@ -303,6 +394,55 @@ async def _phase1_extract_entities(
|
||||
return all_entities
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2a: Entity-Deduplication nach name_normalized
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase2a_deduplicate_entities(
|
||||
db, analysis_id: int, entities: list[dict], ws_manager=None,
|
||||
) -> None:
|
||||
"""Dedupliziert Entities mit gleichem name_normalized (unabhängig vom Typ)."""
|
||||
logger.info(f"Phase 2a: Prüfe {len(entities)} Entitäten auf name_normalized-Duplikate")
|
||||
|
||||
groups = defaultdict(list)
|
||||
for ent in entities:
|
||||
if ent.get("db_id"):
|
||||
groups[ent["name_normalized"]].append(ent)
|
||||
|
||||
merge_count = 0
|
||||
merged_ids = set()
|
||||
|
||||
for nn, group in groups.items():
|
||||
if len(group) < 2:
|
||||
continue
|
||||
# Sortierung: höchste Typ-Priorität, dann meiste Erwähnungen
|
||||
group.sort(
|
||||
key=lambda e: (TYPE_PRIORITY.get(e["type"], 0), e.get("mention_count", 0)),
|
||||
reverse=True,
|
||||
)
|
||||
keep = group[0]
|
||||
for merge in group[1:]:
|
||||
if merge["db_id"] in merged_ids:
|
||||
continue
|
||||
await _merge_entity_in_db(db, analysis_id, keep, merge, entities)
|
||||
merged_ids.add(merge["db_id"])
|
||||
merge_count += 1
|
||||
|
||||
if merge_count > 0:
|
||||
await db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Phase 2a abgeschlossen: {merge_count} Duplikate zusammengeführt, "
|
||||
f"{len(entities)} Entitäten verbleiben"
|
||||
)
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "entity_dedup",
|
||||
"progress": 100,
|
||||
})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2: Beziehungsanalyse (Batched — pro Artikel-Batch)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -566,186 +706,232 @@ async def _phase2_analyze_relationships(
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2b: Korrekturen (Opus)
|
||||
# Phase 2c: Semantische Deduplication (Opus)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase2b_corrections(
|
||||
async def _phase2c_semantic_dedup(
|
||||
db, analysis_id: int, tenant_id: int,
|
||||
entities: list[dict], relation_count: int,
|
||||
usage_acc: UsageAccumulator, ws_manager=None,
|
||||
entities: list[dict], usage_acc: UsageAccumulator, ws_manager=None,
|
||||
) -> None:
|
||||
"""Opus prüft die Entitäten auf Fehler (Merge, Name-Fix, Add)."""
|
||||
"""Semantische Deduplizierung via Opus — erkennt Synonyme, Abkürzungen, Sprachvarianten."""
|
||||
if len(entities) < 10:
|
||||
return
|
||||
|
||||
logger.info(f"Phase 2b: Opus-Korrekturpass für {len(entities)} Entitäten")
|
||||
logger.info(f"Phase 2c: Semantische Dedup für {len(entities)} Entitäten")
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "correction",
|
||||
"phase": "semantic_dedup",
|
||||
"progress": 0,
|
||||
})
|
||||
|
||||
entities_for_prompt = []
|
||||
for e in entities:
|
||||
entities_for_prompt.append({
|
||||
"name": e["name"],
|
||||
"type": e["type"],
|
||||
"description": e.get("description", ""),
|
||||
"aliases": e.get("aliases", []),
|
||||
})
|
||||
# --- Clustering: Token-basiert + Abbreviation-Matching ---
|
||||
token_to_ids = defaultdict(set)
|
||||
db_id_map = {e["db_id"]: e for e in entities if e.get("db_id")}
|
||||
|
||||
batch_size = 500
|
||||
entity_batches = [entities_for_prompt[i:i + batch_size]
|
||||
for i in range(0, len(entities_for_prompt), batch_size)]
|
||||
for ent in entities:
|
||||
db_id = ent.get("db_id")
|
||||
if not db_id:
|
||||
continue
|
||||
|
||||
all_corrections = []
|
||||
for bi, eb in enumerate(entity_batches):
|
||||
entities_json = json.dumps(eb, ensure_ascii=False, indent=1)
|
||||
prompt = CORRECTION_PROMPT.format(entities_json=entities_json)
|
||||
all_names = [ent["name_normalized"]]
|
||||
for a in ent.get("aliases", []):
|
||||
if a:
|
||||
all_names.append(a.lower())
|
||||
|
||||
# Token-basiertes Clustering
|
||||
for name in all_names:
|
||||
for word in re.split(r'[\s\-_/(),.;:]+', name):
|
||||
word = word.strip()
|
||||
if len(word) >= 3 and word not in _STOP_WORDS:
|
||||
token_to_ids[word].add(db_id)
|
||||
|
||||
# Abbreviation-Matching
|
||||
all_display_names = [ent["name"]] + [a for a in ent.get("aliases", []) if a]
|
||||
for name in all_display_names:
|
||||
words = [w for w in re.split(r'[\s\-_/(),.;:]+', name) if w and len(w) >= 2]
|
||||
if len(words) >= 2:
|
||||
abbr = "".join(w[0] for w in words).lower()
|
||||
if len(abbr) >= 2:
|
||||
token_to_ids[f"_abbr_{abbr}"].add(db_id)
|
||||
# Kurze Namen als potenzielle Abkürzungen
|
||||
name_clean = re.sub(r'[^a-zA-Z]', '', name).lower()
|
||||
if 2 <= len(name_clean) <= 6:
|
||||
token_to_ids[f"_abbr_{name_clean}"].add(db_id)
|
||||
|
||||
# Eindeutige Cluster filtern (≥ 2 Entities, ≤ 40 Entities)
|
||||
seen_clusters = set()
|
||||
candidate_clusters = []
|
||||
for token, db_ids in sorted(token_to_ids.items(), key=lambda x: len(x[1])):
|
||||
if len(db_ids) < 2 or len(db_ids) > 40:
|
||||
continue
|
||||
key = frozenset(db_ids)
|
||||
if key in seen_clusters:
|
||||
continue
|
||||
seen_clusters.add(key)
|
||||
candidate_clusters.append(list(db_ids))
|
||||
|
||||
logger.info(f"Phase 2c: {len(candidate_clusters)} Kandidaten-Cluster gefunden")
|
||||
|
||||
# --- Opus-Calls für jeden Cluster ---
|
||||
merged_away = set()
|
||||
total_merges = 0
|
||||
opus_calls = 0
|
||||
max_calls = 50
|
||||
|
||||
for ci, cluster_ids in enumerate(candidate_clusters):
|
||||
if opus_calls >= max_calls:
|
||||
logger.warning(f"Phase 2c: Max {max_calls} Opus-Calls erreicht, stoppe")
|
||||
break
|
||||
|
||||
# Bereits gemergte Entities filtern
|
||||
active_ids = [did for did in cluster_ids if did not in merged_away and did in db_id_map]
|
||||
if len(active_ids) < 2:
|
||||
continue
|
||||
|
||||
active_ents = [db_id_map[did] for did in active_ids]
|
||||
|
||||
# Prompt bauen
|
||||
lines = []
|
||||
for i, ent in enumerate(active_ents, 1):
|
||||
aliases_str = ", ".join((ent.get("aliases") or [])[:5])
|
||||
line = f"{i}. {ent['name']} ({ent['type']}, {ent.get('mention_count', 0)} Erwähnungen)"
|
||||
if aliases_str:
|
||||
line += f" [Aliases: {aliases_str}]"
|
||||
lines.append(line)
|
||||
|
||||
prompt = SEMANTIC_DEDUP_PROMPT.format(entity_list="\n".join(lines))
|
||||
|
||||
try:
|
||||
result_text, usage = await call_claude(prompt, tools=None, model=None)
|
||||
usage_acc.add(usage)
|
||||
opus_calls += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Opus Korrektur-Batch {bi + 1} fehlgeschlagen: {e}")
|
||||
logger.error(f"Phase 2c Opus-Call {opus_calls + 1} fehlgeschlagen: {e}")
|
||||
continue
|
||||
|
||||
parsed = _parse_json_response(result_text)
|
||||
if not parsed:
|
||||
continue
|
||||
|
||||
corrections = parsed.get("corrections", [])
|
||||
if isinstance(corrections, list):
|
||||
all_corrections.extend(corrections)
|
||||
for merge_group in parsed.get("merges", []):
|
||||
keep_idx = merge_group.get("keep")
|
||||
merge_indices = merge_group.get("merge", [])
|
||||
if keep_idx is None or not merge_indices:
|
||||
continue
|
||||
if not isinstance(merge_indices, list):
|
||||
merge_indices = [merge_indices]
|
||||
|
||||
logger.info(f"Korrektur-Batch {bi + 1}/{len(entity_batches)}: {len(corrections)} Korrekturen")
|
||||
keep_idx -= 1 # 1-indexed → 0-indexed
|
||||
if keep_idx < 0 or keep_idx >= len(active_ents):
|
||||
continue
|
||||
keep_ent = active_ents[keep_idx]
|
||||
if keep_ent["db_id"] in merged_away:
|
||||
continue
|
||||
|
||||
if all_corrections:
|
||||
await _apply_corrections(db, analysis_id, tenant_id, entities, all_corrections)
|
||||
logger.info(f"Phase 2b abgeschlossen: {len(all_corrections)} Korrekturen angewendet")
|
||||
for mi in merge_indices:
|
||||
mi -= 1
|
||||
if mi < 0 or mi >= len(active_ents) or mi == keep_idx:
|
||||
continue
|
||||
merge_ent = active_ents[mi]
|
||||
if merge_ent["db_id"] in merged_away:
|
||||
continue
|
||||
|
||||
logger.info(f"Semantic Merge: '{merge_ent['name']}' → '{keep_ent['name']}'")
|
||||
await _merge_entity_in_db(db, analysis_id, keep_ent, merge_ent, entities)
|
||||
merged_away.add(merge_ent["db_id"])
|
||||
db_id_map.pop(merge_ent["db_id"], None)
|
||||
total_merges += 1
|
||||
|
||||
# Progress
|
||||
progress = int((ci + 1) / len(candidate_clusters) * 100) if candidate_clusters else 100
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "semantic_dedup",
|
||||
"progress": min(progress, 100),
|
||||
})
|
||||
|
||||
if total_merges > 0:
|
||||
await db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Phase 2c abgeschlossen: {total_merges} Merges, {opus_calls} Opus-Calls, "
|
||||
f"{len(entities)} Entitäten verbleiben"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 2d: Cleanup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _phase2d_cleanup(
|
||||
db, analysis_id: int, entities: list[dict], ws_manager=None,
|
||||
) -> None:
|
||||
"""Cleanup: Self-Loops, Richtungsnormalisierung, Duplikat-Relations, verwaiste Entities."""
|
||||
logger.info("Phase 2d: Cleanup")
|
||||
|
||||
# 1. Self-Loops entfernen
|
||||
cursor = await db.execute(
|
||||
"DELETE FROM network_relations WHERE network_analysis_id = ? AND source_entity_id = target_entity_id",
|
||||
(analysis_id,),
|
||||
)
|
||||
self_loops = cursor.rowcount
|
||||
|
||||
# 2. Richtungsnormalisierung (nach Merges können Richtungen inkonsistent sein)
|
||||
await db.execute(
|
||||
"""UPDATE network_relations
|
||||
SET source_entity_id = target_entity_id, target_entity_id = source_entity_id
|
||||
WHERE source_entity_id > target_entity_id AND network_analysis_id = ?""",
|
||||
(analysis_id,),
|
||||
)
|
||||
|
||||
# 3. Duplikat-Relations entfernen (gleiche source+target+category nach Normalisierung)
|
||||
cursor = await db.execute(
|
||||
"""DELETE FROM network_relations
|
||||
WHERE network_analysis_id = ? AND id NOT IN (
|
||||
SELECT MIN(id) FROM network_relations
|
||||
WHERE network_analysis_id = ?
|
||||
GROUP BY source_entity_id, target_entity_id, category
|
||||
)""",
|
||||
(analysis_id, analysis_id),
|
||||
)
|
||||
dup_relations = cursor.rowcount
|
||||
|
||||
# 4. Verwaiste Entities entfernen (keine Verbindungen)
|
||||
cursor = await db.execute(
|
||||
"""DELETE FROM network_entities
|
||||
WHERE network_analysis_id = ? AND id NOT IN (
|
||||
SELECT source_entity_id FROM network_relations WHERE network_analysis_id = ?
|
||||
UNION
|
||||
SELECT target_entity_id FROM network_relations WHERE network_analysis_id = ?
|
||||
)""",
|
||||
(analysis_id, analysis_id, analysis_id),
|
||||
)
|
||||
orphans = cursor.rowcount
|
||||
|
||||
# Entities-Liste aktualisieren
|
||||
remaining_ids = set()
|
||||
cursor = await db.execute(
|
||||
"SELECT id FROM network_entities WHERE network_analysis_id = ?", (analysis_id,)
|
||||
)
|
||||
for row in await cursor.fetchall():
|
||||
remaining_ids.add(row["id"])
|
||||
entities[:] = [e for e in entities if e.get("db_id") in remaining_ids]
|
||||
|
||||
await db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Phase 2d abgeschlossen: {self_loops} Self-Loops, {dup_relations} Duplikat-Relations, "
|
||||
f"{orphans} verwaiste Entities entfernt, {len(entities)} Entitäten verbleiben"
|
||||
)
|
||||
|
||||
await _broadcast(ws_manager, "network_status", {
|
||||
"analysis_id": analysis_id,
|
||||
"phase": "correction",
|
||||
"phase": "cleanup",
|
||||
"progress": 100,
|
||||
})
|
||||
|
||||
|
||||
async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections):
|
||||
"""Wendet Opus-Korrekturen auf Entitäten an."""
|
||||
for corr in corrections:
|
||||
if not isinstance(corr, dict):
|
||||
continue
|
||||
corr_type = corr.get("type", "")
|
||||
|
||||
try:
|
||||
if corr_type == "name_fix":
|
||||
entity_name = (corr.get("entity_name") or "").strip()
|
||||
corrected_name = (corr.get("corrected_name") or "").strip()
|
||||
if not entity_name or not corrected_name:
|
||||
continue
|
||||
|
||||
for ent in entities:
|
||||
if ent["name"].lower() == entity_name.lower() or \
|
||||
ent["name_normalized"] == entity_name.lower():
|
||||
old_name = ent["name"]
|
||||
ent["name"] = corrected_name
|
||||
ent["name_normalized"] = corrected_name.lower().strip()
|
||||
ent.setdefault("aliases", [])
|
||||
if old_name not in ent["aliases"]:
|
||||
ent["aliases"].append(old_name)
|
||||
|
||||
if ent.get("db_id"):
|
||||
await db.execute(
|
||||
"""UPDATE network_entities
|
||||
SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1
|
||||
WHERE id = ?""",
|
||||
(corrected_name, corrected_name.lower().strip(),
|
||||
json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]),
|
||||
)
|
||||
break
|
||||
|
||||
elif corr_type == "merge":
|
||||
keep_name = (corr.get("entity_name") or "").strip()
|
||||
merge_name = (corr.get("merge_with") or "").strip()
|
||||
if not keep_name or not merge_name:
|
||||
continue
|
||||
|
||||
keep_ent = merge_ent = None
|
||||
for ent in entities:
|
||||
nl = ent["name"].lower()
|
||||
nn = ent["name_normalized"]
|
||||
if nl == keep_name.lower() or nn == keep_name.lower():
|
||||
keep_ent = ent
|
||||
elif nl == merge_name.lower() or nn == merge_name.lower():
|
||||
merge_ent = ent
|
||||
|
||||
if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"):
|
||||
aliases = set(keep_ent.get("aliases", []))
|
||||
aliases.add(merge_ent["name"])
|
||||
for a in merge_ent.get("aliases", []):
|
||||
if a and a.strip():
|
||||
aliases.add(a.strip())
|
||||
keep_ent["aliases"] = list(aliases)
|
||||
keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0)
|
||||
|
||||
await db.execute(
|
||||
"UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
||||
(keep_ent["db_id"], merge_ent["db_id"]),
|
||||
)
|
||||
await db.execute(
|
||||
"UPDATE network_relations SET source_entity_id = ? WHERE source_entity_id = ? AND network_analysis_id = ?",
|
||||
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
|
||||
)
|
||||
await db.execute(
|
||||
"UPDATE network_relations SET target_entity_id = ? WHERE target_entity_id = ? AND network_analysis_id = ?",
|
||||
(keep_ent["db_id"], merge_ent["db_id"], analysis_id),
|
||||
)
|
||||
await db.execute(
|
||||
"""UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1
|
||||
WHERE id = ?""",
|
||||
(json.dumps(keep_ent["aliases"], ensure_ascii=False),
|
||||
keep_ent["mention_count"], keep_ent["db_id"]),
|
||||
)
|
||||
await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],))
|
||||
entities.remove(merge_ent)
|
||||
|
||||
elif corr_type == "add":
|
||||
entity_name = (corr.get("entity_name") or "").strip()
|
||||
entity_type = (corr.get("entity_type") or "organisation").lower().strip()
|
||||
description = corr.get("description", "")
|
||||
if not entity_name:
|
||||
continue
|
||||
|
||||
valid_types = {"person", "organisation", "location", "event", "military"}
|
||||
if entity_type not in valid_types:
|
||||
entity_type = "organisation"
|
||||
|
||||
name_norm = entity_name.lower().strip()
|
||||
if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities):
|
||||
continue
|
||||
|
||||
cursor = await db.execute(
|
||||
"""INSERT OR IGNORE INTO network_entities
|
||||
(network_analysis_id, name, name_normalized, entity_type,
|
||||
description, aliases, mention_count, corrected_by_opus, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""",
|
||||
(analysis_id, entity_name, name_norm, entity_type, description, tenant_id),
|
||||
)
|
||||
if cursor.lastrowid:
|
||||
entities.append({
|
||||
"name": entity_name, "name_normalized": name_norm,
|
||||
"type": entity_type, "description": description,
|
||||
"aliases": [], "mention_count": 1, "db_id": cursor.lastrowid,
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}")
|
||||
|
||||
await db.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase 3: Finalisierung
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -800,8 +986,10 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
||||
"""Hauptfunktion: Entity-Extraktion + Beziehungsanalyse.
|
||||
|
||||
Phase 1: Haiku extrahiert Entitäten aus Artikeln (in Batches)
|
||||
Phase 2a: Entity-Dedup nach name_normalized (Code, kein API)
|
||||
Phase 2: Haiku extrahiert Beziehungen pro Batch, dann Merge + Deduplizierung
|
||||
Phase 2b: Opus korrigiert Entitäten (Name-Fix, Merge, Add)
|
||||
Phase 2c: Semantische Dedup via Opus (Cluster-weise)
|
||||
Phase 2d: Cleanup (Self-Loops, Verwaiste, Duplikat-Relations)
|
||||
Phase 3: Finalisierung (Zähler, Hash, Log)
|
||||
"""
|
||||
from database import get_db
|
||||
@@ -879,7 +1067,7 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
||||
logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, "
|
||||
f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen")
|
||||
|
||||
# Phase 1
|
||||
# Phase 1: Entity-Extraktion
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
@@ -887,7 +1075,13 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
||||
db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager,
|
||||
)
|
||||
|
||||
# Phase 2
|
||||
# Phase 2a: Entity-Deduplication
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
await _phase2a_deduplicate_entities(db, analysis_id, entities, ws_manager)
|
||||
|
||||
# Phase 2: Beziehungsextraktion
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
@@ -895,15 +1089,21 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
||||
db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager,
|
||||
)
|
||||
|
||||
# Phase 2b: Korrekturen
|
||||
# Phase 2c: Semantische Deduplication
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
await _phase2b_corrections(
|
||||
db, analysis_id, tenant_id, entities, len(relations), usage_acc, ws_manager,
|
||||
await _phase2c_semantic_dedup(
|
||||
db, analysis_id, tenant_id, entities, usage_acc, ws_manager,
|
||||
)
|
||||
|
||||
# Entity-Count nach Korrekturen aktualisieren
|
||||
# Phase 2d: Cleanup
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
await _phase2d_cleanup(db, analysis_id, entities, ws_manager)
|
||||
|
||||
# Finale Zähler aus DB (nach allen Cleanups)
|
||||
cursor = await db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM network_entities WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
@@ -911,13 +1111,20 @@ async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manag
|
||||
row = await cursor.fetchone()
|
||||
final_entity_count = row["cnt"] if row else len(entities)
|
||||
|
||||
# Phase 3
|
||||
cursor = await db.execute(
|
||||
"SELECT COUNT(*) as cnt FROM network_relations WHERE network_analysis_id = ?",
|
||||
(analysis_id,),
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
final_relation_count = row["cnt"] if row else len(relations)
|
||||
|
||||
# Phase 3: Finalisierung
|
||||
if not await _check_analysis_exists(db, analysis_id):
|
||||
return
|
||||
|
||||
await _phase3_finalize(
|
||||
db, analysis_id, tenant_id,
|
||||
entity_count=final_entity_count, relation_count=len(relations),
|
||||
entity_count=final_entity_count, relation_count=final_relation_count,
|
||||
article_ids=article_ids, factcheck_ids=factcheck_ids,
|
||||
article_ts=article_ts, factcheck_ts=factcheck_ts,
|
||||
usage_acc=usage_acc, ws_manager=ws_manager,
|
||||
|
||||
@@ -1316,6 +1316,41 @@ class AgentOrchestrator:
|
||||
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
|
||||
)
|
||||
|
||||
# Credits-Tracking: Monatliche Aggregation + Credits abziehen
|
||||
if tenant_id and usage_acc.total_cost_usd > 0:
|
||||
year_month = datetime.now(TIMEZONE).strftime('%Y-%m')
|
||||
await db.execute("""
|
||||
INSERT INTO token_usage_monthly
|
||||
(organization_id, year_month, input_tokens, output_tokens,
|
||||
cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, refresh_count)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
|
||||
ON CONFLICT(organization_id, year_month) DO UPDATE SET
|
||||
input_tokens = input_tokens + excluded.input_tokens,
|
||||
output_tokens = output_tokens + excluded.output_tokens,
|
||||
cache_creation_tokens = cache_creation_tokens + excluded.cache_creation_tokens,
|
||||
cache_read_tokens = cache_read_tokens + excluded.cache_read_tokens,
|
||||
total_cost_usd = total_cost_usd + excluded.total_cost_usd,
|
||||
api_calls = api_calls + excluded.api_calls,
|
||||
refresh_count = refresh_count + 1,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""", (tenant_id, year_month,
|
||||
usage_acc.input_tokens, usage_acc.output_tokens,
|
||||
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
|
||||
round(usage_acc.total_cost_usd, 7), usage_acc.call_count))
|
||||
|
||||
# Credits auf Lizenz abziehen
|
||||
lic_cursor = await db.execute(
|
||||
"SELECT cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
|
||||
(tenant_id,))
|
||||
lic = await lic_cursor.fetchone()
|
||||
if lic and lic["cost_per_credit"] and lic["cost_per_credit"] > 0:
|
||||
credits_consumed = usage_acc.total_cost_usd / lic["cost_per_credit"]
|
||||
await db.execute(
|
||||
"UPDATE licenses SET credits_used = COALESCE(credits_used, 0) + ? WHERE organization_id = ? AND status = 'active'",
|
||||
(round(credits_consumed, 2), tenant_id))
|
||||
await db.commit()
|
||||
logger.info(f"Credits: {round(credits_consumed, 1) if lic and lic['cost_per_credit'] else 0} abgezogen für Tenant {tenant_id}")
|
||||
|
||||
# Quellen-Discovery im Background starten
|
||||
if unique_results:
|
||||
asyncio.create_task(_background_discover_sources(unique_results))
|
||||
|
||||
@@ -582,6 +582,41 @@ async def init_db():
|
||||
await db.commit()
|
||||
logger.info("Migration: article_locations-Tabelle erstellt")
|
||||
|
||||
|
||||
# Migration: Credits-System fuer Lizenzen
|
||||
cursor = await db.execute("PRAGMA table_info(licenses)")
|
||||
columns = [row[1] for row in await cursor.fetchall()]
|
||||
if "token_budget_usd" not in columns:
|
||||
await db.execute("ALTER TABLE licenses ADD COLUMN token_budget_usd REAL")
|
||||
await db.execute("ALTER TABLE licenses ADD COLUMN credits_total INTEGER")
|
||||
await db.execute("ALTER TABLE licenses ADD COLUMN credits_used REAL DEFAULT 0")
|
||||
await db.execute("ALTER TABLE licenses ADD COLUMN cost_per_credit REAL")
|
||||
await db.execute("ALTER TABLE licenses ADD COLUMN budget_warning_percent INTEGER DEFAULT 80")
|
||||
await db.commit()
|
||||
logger.info("Migration: Credits-System zu Lizenzen hinzugefuegt")
|
||||
|
||||
# Migration: Token-Usage-Monatstabelle
|
||||
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='token_usage_monthly'")
|
||||
if not await cursor.fetchone():
|
||||
await db.execute("""
|
||||
CREATE TABLE token_usage_monthly (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
organization_id INTEGER REFERENCES organizations(id),
|
||||
year_month TEXT NOT NULL,
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
cache_creation_tokens INTEGER DEFAULT 0,
|
||||
cache_read_tokens INTEGER DEFAULT 0,
|
||||
total_cost_usd REAL DEFAULT 0.0,
|
||||
api_calls INTEGER DEFAULT 0,
|
||||
refresh_count INTEGER DEFAULT 0,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(organization_id, year_month)
|
||||
)
|
||||
""")
|
||||
await db.commit()
|
||||
logger.info("Migration: token_usage_monthly Tabelle erstellt")
|
||||
|
||||
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
|
||||
await db.execute(
|
||||
"""UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart',
|
||||
|
||||
@@ -41,6 +41,9 @@ class UserMeResponse(BaseModel):
|
||||
license_status: str = "unknown"
|
||||
license_type: str = ""
|
||||
read_only: bool = False
|
||||
credits_total: Optional[int] = None
|
||||
credits_remaining: Optional[int] = None
|
||||
credits_percent_used: Optional[float] = None
|
||||
|
||||
|
||||
# Incidents (Lagen)
|
||||
|
||||
@@ -261,10 +261,28 @@ async def get_me(
|
||||
from services.license_service import check_license
|
||||
license_info = await check_license(db, current_user["tenant_id"])
|
||||
|
||||
# Credits-Daten laden
|
||||
credits_total = None
|
||||
credits_remaining = None
|
||||
credits_percent_used = None
|
||||
if current_user.get("tenant_id"):
|
||||
lic_cursor = await db.execute(
|
||||
"SELECT credits_total, credits_used, cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
|
||||
(current_user["tenant_id"],))
|
||||
lic_row = await lic_cursor.fetchone()
|
||||
if lic_row and lic_row["credits_total"]:
|
||||
credits_total = lic_row["credits_total"]
|
||||
credits_used = lic_row["credits_used"] or 0
|
||||
credits_remaining = max(0, int(credits_total - credits_used))
|
||||
credits_percent_used = round(min(100, (credits_used / credits_total) * 100), 1) if credits_total > 0 else 0
|
||||
|
||||
return UserMeResponse(
|
||||
id=current_user["id"],
|
||||
username=current_user["username"],
|
||||
email=current_user.get("email", ""),
|
||||
credits_total=credits_total,
|
||||
credits_remaining=credits_remaining,
|
||||
credits_percent_used=credits_percent_used,
|
||||
role=current_user["role"],
|
||||
org_name=org_name,
|
||||
org_slug=current_user.get("org_slug", ""),
|
||||
|
||||
@@ -415,12 +415,14 @@ async def create_source(
|
||||
"""Neue Quelle hinzufuegen (org-spezifisch)."""
|
||||
tenant_id = current_user.get("tenant_id")
|
||||
|
||||
# Domain normalisieren (Subdomain-Aliase auflösen)
|
||||
# Domain normalisieren (Subdomain-Aliase auflösen, aus URL extrahieren)
|
||||
domain = data.domain
|
||||
if not domain and data.url:
|
||||
domain = _extract_domain(data.url)
|
||||
if domain:
|
||||
domain = _DOMAIN_ALIASES.get(domain.lower(), domain.lower())
|
||||
|
||||
# Duplikat-Prüfung: gleiche URL bereits vorhanden?
|
||||
# Duplikat-Prüfung 1: gleiche URL bereits vorhanden? (tenant-übergreifend)
|
||||
if data.url:
|
||||
cursor = await db.execute(
|
||||
"SELECT id, name FROM sources WHERE url = ? AND status = 'active'",
|
||||
@@ -433,6 +435,25 @@ async def create_source(
|
||||
detail=f"Feed-URL bereits vorhanden: {existing['name']} (ID {existing['id']})",
|
||||
)
|
||||
|
||||
# Duplikat-Prüfung 2: Domain bereits vorhanden? (tenant-übergreifend)
|
||||
if domain:
|
||||
cursor = await db.execute(
|
||||
"SELECT id, name, source_type FROM sources WHERE LOWER(domain) = ? AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) LIMIT 1",
|
||||
(domain.lower(), tenant_id),
|
||||
)
|
||||
domain_existing = await cursor.fetchone()
|
||||
if domain_existing:
|
||||
if data.source_type == "web_source":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail=f"Web-Quelle für '{domain}' bereits vorhanden: {domain_existing['name']}",
|
||||
)
|
||||
if not data.url:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.",
|
||||
)
|
||||
|
||||
cursor = await db.execute(
|
||||
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
|
||||
@@ -5335,3 +5335,69 @@ body.tutorial-active .tutorial-cursor {
|
||||
color: var(--text-primary);
|
||||
box-shadow: 0 0 0 2px rgba(150, 121, 26, 0.25);
|
||||
}
|
||||
|
||||
/* ===== Credits-Anzeige im User-Dropdown ===== */
|
||||
.credits-section {
|
||||
padding: 0;
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.credits-divider {
|
||||
height: 1px;
|
||||
background: var(--border);
|
||||
margin: 8px 0;
|
||||
}
|
||||
|
||||
.credits-label {
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.5px;
|
||||
color: var(--text-tertiary);
|
||||
margin-bottom: 8px;
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.credits-bar-container {
|
||||
width: 100%;
|
||||
height: 8px;
|
||||
background: rgba(255,255,255,0.08);
|
||||
border: 1px solid rgba(255,255,255,0.12);
|
||||
border-radius: 4px;
|
||||
overflow: hidden;
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
|
||||
.credits-bar {
|
||||
height: 100%;
|
||||
border-radius: 4px;
|
||||
background: var(--accent);
|
||||
transition: width 0.6s ease, background-color 0.3s ease;
|
||||
min-width: 2px;
|
||||
}
|
||||
|
||||
.credits-bar.warning {
|
||||
background: #e67e22;
|
||||
}
|
||||
|
||||
.credits-bar.critical {
|
||||
background: #e74c3c;
|
||||
}
|
||||
|
||||
.credits-info {
|
||||
font-size: 12px;
|
||||
color: var(--text-tertiary);
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.credits-info span {
|
||||
font-weight: 400;
|
||||
color: var(--text-secondary);
|
||||
}
|
||||
|
||||
.credits-percent {
|
||||
font-size: 11px;
|
||||
color: var(--text-tertiary);
|
||||
}
|
||||
|
||||
@@ -50,6 +50,17 @@
|
||||
<span class="header-dropdown-label">Lizenz</span>
|
||||
<span class="header-dropdown-value" id="header-license-info">-</span>
|
||||
</div>
|
||||
<div id="credits-section" class="credits-section" style="display: none;">
|
||||
<div class="credits-divider"></div>
|
||||
<div class="credits-label">Credits</div>
|
||||
<div class="credits-bar-container">
|
||||
<div id="credits-bar" class="credits-bar"></div>
|
||||
</div>
|
||||
<div class="credits-info">
|
||||
<span><span id="credits-remaining">0</span> von <span id="credits-total">0</span></span>
|
||||
<span class="credits-percent" id="credits-percent"></span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="header-license-warning" id="header-license-warning"></div>
|
||||
|
||||
@@ -466,6 +466,34 @@ const App = {
|
||||
licInfoEl.textContent = label;
|
||||
}
|
||||
|
||||
// Credits-Anzeige im Dropdown
|
||||
const creditsSection = document.getElementById('credits-section');
|
||||
if (creditsSection && user.credits_total) {
|
||||
creditsSection.style.display = 'block';
|
||||
const bar = document.getElementById('credits-bar');
|
||||
const remainingEl = document.getElementById('credits-remaining');
|
||||
const totalEl = document.getElementById('credits-total');
|
||||
|
||||
const remaining = user.credits_remaining || 0;
|
||||
const total = user.credits_total || 1;
|
||||
const percentUsed = user.credits_percent_used || 0;
|
||||
const percentRemaining = Math.max(0, 100 - percentUsed);
|
||||
|
||||
remainingEl.textContent = remaining.toLocaleString('de-DE');
|
||||
totalEl.textContent = total.toLocaleString('de-DE');
|
||||
bar.style.width = percentRemaining + '%';
|
||||
|
||||
// Farbwechsel je nach Verbrauch
|
||||
bar.classList.remove('warning', 'critical');
|
||||
if (percentUsed > 80) {
|
||||
bar.classList.add('critical');
|
||||
} else if (percentUsed > 50) {
|
||||
bar.classList.add('warning');
|
||||
}
|
||||
const percentEl = document.getElementById("credits-percent");
|
||||
if (percentEl) percentEl.textContent = percentRemaining.toFixed(0) + "% verbleibend";
|
||||
}
|
||||
|
||||
// Dropdown Toggle
|
||||
const userBtn = document.getElementById('header-user-btn');
|
||||
const userDropdown = document.getElementById('header-user-dropdown');
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren