diff --git a/src/agents/entity_extractor.py b/src/agents/entity_extractor.py new file mode 100644 index 0000000..41aedb2 --- /dev/null +++ b/src/agents/entity_extractor.py @@ -0,0 +1,746 @@ +"""Netzwerkanalyse: Entity-Extraktion (Haiku) + Beziehungsanalyse (Opus).""" +import asyncio +import hashlib +import json +import logging +import re +from datetime import datetime +from typing import Optional + +from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator +from config import CLAUDE_MODEL_FAST, TIMEZONE + +logger = logging.getLogger("osint.entity_extractor") + +# --------------------------------------------------------------------------- +# Prompts +# --------------------------------------------------------------------------- + +ENTITY_EXTRACTION_PROMPT = """Du bist ein OSINT-Analyst für ein Lagemonitoring-System. +AUFGABE: Extrahiere ALLE relevanten Entitäten aus den folgenden Nachrichtenartikeln. + +ARTIKEL: +{articles_text} + +REGELN: +- Extrahiere JEDE genannte Person, Organisation, Ort, Ereignis und militärische Einheit +- Normalisiere Namen: "Wladimir Putin", "Putin", "V. Putin" -> eine Entität +- Aliase erfassen: Alle Namensvarianten einer Entität als aliases[] +- mention_count: Wie oft wird die Entität insgesamt in allen Artikeln erwähnt? +- Beschreibung: Kurze Einordnung, wer/was die Entität ist (1 Satz) +- KEINE Duplikate: Gleiche Entitäten zusammenfassen + +ENTITY-TYPEN: +- "person": Individuelle Personen (Politiker, Militärs, Journalisten etc.) +- "organisation": Organisationen, Parteien, Behörden, Unternehmen, NGOs +- "location": Länder, Städte, Regionen, Gebiete +- "event": Konkrete Ereignisse (Wahlen, Anschläge, Konferenzen etc.) +- "military": Militärische Einheiten, Waffensysteme, Operationen + +AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: +{{ + "entities": [ + {{ + "name": "Vollständiger Name", + "name_normalized": "vollständiger name", + "type": "person|organisation|location|event|military", + "description": "Kurze Einordnung (1 Satz)", + "aliases": ["Alias1", "Alias2"], + "mention_count": 5 + }} + ] +}}""" + + +RELATIONSHIP_ANALYSIS_PROMPT = """Du bist ein Senior OSINT-Analyst für ein Lagemonitoring-System. +AUFGABE: Analysiere die Beziehungen zwischen den extrahierten Entitäten und korrigiere ggf. Fehler. + +EXTRAHIERTE ENTITÄTEN: +{entities_json} + +QUELLMATERIAL: +{source_texts} + +TEIL 1 — KORREKTUREN (optional): +Prüfe die extrahierten Entitäten auf Fehler: +- "name_fix": Name ist falsch geschrieben oder unvollständig +- "merge": Zwei Entitäten sind dieselbe -> zusammenführen +- "add": Wichtige Entität fehlt komplett + +TEIL 2 — BEZIEHUNGEN: +Identifiziere ALLE relevanten Beziehungen zwischen den Entitäten. + +BEZIEHUNGS-KATEGORIEN: +- "alliance": Bündnis, Kooperation, Unterstützung, Partnerschaft +- "conflict": Konflikt, Krieg, Feindschaft, Sanktionen, Opposition +- "diplomacy": Diplomatische Beziehungen, Verhandlungen, Abkommen +- "economic": Wirtschaftsbeziehungen, Handel, Investitionen +- "legal": Rechtliche Beziehungen, Klagen, Verurteilungen +- "neutral": Sonstige Beziehung, Erwähnung, Verbindung + +REGELN: +- weight: 1 (schwach/indirekt) bis 5 (stark/direkt) +- evidence[]: Stichpunkte aus dem Quellmaterial die die Beziehung belegen +- status: "active" (aktuell), "historical" (vergangen), "emerging" (sich entwickelnd) +- source und target: Exakt die Namen aus der Entitäten-Liste verwenden + +AUSGABEFORMAT — Antworte AUSSCHLIESSLICH mit diesem JSON: +{{ + "corrections": [ + {{"type": "name_fix", "entity_name": "Falscher Name", "corrected_name": "Korrekter Name"}}, + {{"type": "merge", "entity_name": "Behalten", "merge_with": "Duplikat löschen"}}, + {{"type": "add", "entity_name": "Neuer Name", "entity_type": "person", "description": "Einordnung"}} + ], + "relations": [ + {{ + "source": "Entität A", + "target": "Entität B", + "category": "alliance|conflict|diplomacy|economic|legal|neutral", + "label": "Kurzes Label (2-4 Wörter)", + "description": "Beschreibung (1-2 Sätze)", + "weight": 3, + "status": "active|historical|emerging", + "evidence": ["Beleg 1", "Beleg 2"] + }} + ] +}}""" + + +# --------------------------------------------------------------------------- +# Hilfsfunktionen +# --------------------------------------------------------------------------- + +async def _broadcast(ws_manager, msg_type: str, data: dict): + """Sendet eine WebSocket-Nachricht, falls ws_manager vorhanden.""" + if ws_manager: + try: + await ws_manager.broadcast({"type": msg_type, **data}) + except Exception: + pass + + +def _parse_json_response(text: str) -> Optional[dict]: + """Parst JSON aus Claude-Antwort. Handhabt Markdown-Fences.""" + if not text: + return None + # Direkt + try: + return json.loads(text) + except json.JSONDecodeError: + pass + # Markdown-Fences + fence_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL) + if fence_match: + try: + return json.loads(fence_match.group(1)) + except json.JSONDecodeError: + pass + # Erstes JSON-Objekt + obj_match = re.search(r'\{.*\}', text, re.DOTALL) + if obj_match: + try: + return json.loads(obj_match.group()) + except json.JSONDecodeError: + pass + logger.warning("JSON-Parse fehlgeschlagen") + return None + + +async def _check_analysis_exists(db, analysis_id: int) -> bool: + """Prüft ob die Analyse noch existiert.""" + cursor = await db.execute( + "SELECT id FROM network_analyses WHERE id = ?", (analysis_id,) + ) + return await cursor.fetchone() is not None + + +def _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) -> str: + """SHA256-Hash über sortierte IDs und Timestamps.""" + parts = [] + for aid, ats in sorted(zip(article_ids, article_ts)): + parts.append(f"a:{aid}:{ats}") + for fid, fts in sorted(zip(factcheck_ids, factcheck_ts)): + parts.append(f"f:{fid}:{fts}") + return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() + + +# --------------------------------------------------------------------------- +# Phase 1: Entity-Extraktion (Haiku) +# --------------------------------------------------------------------------- + +async def _phase1_extract_entities( + db, analysis_id: int, tenant_id: int, + articles: list[dict], factchecks: list[dict], + usage_acc: UsageAccumulator, ws_manager=None, +) -> list[dict]: + """Extrahiert Entitäten aus Artikeln via Haiku in Batches.""" + logger.info(f"Phase 1: {len(articles)} Artikel, {len(factchecks)} Faktenchecks") + + all_texts = [] + for art in articles: + headline = art.get("headline_de") or art.get("headline") or "" + content = art.get("content_de") or art.get("content_original") or "" + source = art.get("source") or "" + # Inhalt kürzen falls sehr lang + if len(content) > 2000: + content = content[:2000] + "..." + all_texts.append(f"[{source}] {headline}\n{content}") + + for fc in factchecks: + claim = fc.get("claim") or "" + evidence = fc.get("evidence") or "" + status = fc.get("status") or "" + all_texts.append(f"[Faktencheck] {claim} (Status: {status})\n{evidence}") + + if not all_texts: + logger.warning(f"Analyse {analysis_id}: Keine Texte vorhanden") + return [] + + # Batching + batch_size = 30 + batches = [all_texts[i:i + batch_size] for i in range(0, len(all_texts), batch_size)] + logger.info(f"{len(all_texts)} Texte in {len(batches)} Batches") + + entity_map: dict[tuple[str, str], dict] = {} + + for batch_idx, batch in enumerate(batches): + articles_text = "\n\n---\n\n".join(batch) + prompt = ENTITY_EXTRACTION_PROMPT.format(articles_text=articles_text) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + usage_acc.add(usage) + except Exception as e: + logger.error(f"Haiku Batch {batch_idx + 1}/{len(batches)} fehlgeschlagen: {e}") + continue + + parsed = _parse_json_response(result_text) + if not parsed or "entities" not in parsed: + logger.warning(f"Batch {batch_idx + 1}: Kein gültiges JSON") + continue + + entities = parsed["entities"] + if not isinstance(entities, list): + continue + + for ent in entities: + if not isinstance(ent, dict): + continue + name = (ent.get("name") or "").strip() + if not name: + continue + + name_normalized = (ent.get("name_normalized") or name.lower()).strip().lower() + entity_type = (ent.get("type") or "organisation").lower().strip() + valid_types = {"person", "organisation", "location", "event", "military"} + if entity_type not in valid_types: + entity_type = "organisation" + + key = (name_normalized, entity_type) + + if key in entity_map: + existing = entity_map[key] + aliases = set(existing.get("aliases", [])) + for alias in ent.get("aliases", []): + if alias and alias.strip(): + aliases.add(alias.strip()) + if name != existing["name"]: + aliases.add(name) + existing["aliases"] = list(aliases) + existing["mention_count"] = existing.get("mention_count", 1) + ent.get("mention_count", 1) + new_desc = ent.get("description", "") + if len(new_desc) > len(existing.get("description", "")): + existing["description"] = new_desc + else: + entity_map[key] = { + "name": name, + "name_normalized": name_normalized, + "type": entity_type, + "description": ent.get("description", ""), + "aliases": [a.strip() for a in ent.get("aliases", []) if a and a.strip()], + "mention_count": ent.get("mention_count", 1), + } + + logger.info(f"Batch {batch_idx + 1}/{len(batches)}: {len(entity_map)} Entitäten gesamt") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "entity_extraction", + "progress": int((batch_idx + 1) / len(batches) * 100), + }) + + # In DB speichern + all_entities = list(entity_map.values()) + + for ent in all_entities: + try: + cursor = await db.execute( + """INSERT OR IGNORE INTO network_entities + (network_analysis_id, name, name_normalized, entity_type, + description, aliases, mention_count, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + ( + analysis_id, ent["name"], ent["name_normalized"], ent["type"], + ent.get("description", ""), + json.dumps(ent.get("aliases", []), ensure_ascii=False), + ent.get("mention_count", 1), + tenant_id, + ), + ) + ent["db_id"] = cursor.lastrowid + except Exception as e: + logger.warning(f"Entity speichern fehlgeschlagen '{ent['name']}': {e}") + + await db.commit() + logger.info(f"Phase 1 abgeschlossen: {len(all_entities)} Entitäten gespeichert") + return all_entities + + +# --------------------------------------------------------------------------- +# Phase 2: Beziehungsanalyse (Opus) +# --------------------------------------------------------------------------- + +async def _phase2_analyze_relationships( + db, analysis_id: int, tenant_id: int, + entities: list[dict], articles: list[dict], factchecks: list[dict], + usage_acc: UsageAccumulator, ws_manager=None, +) -> list[dict]: + """Analysiert Beziehungen via Opus und wendet Korrekturen an.""" + if not entities: + return [] + + logger.info(f"Phase 2: {len(entities)} Entitäten, Beziehungsanalyse") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 0, + }) + + # Entitäten für Prompt + entities_for_prompt = [ + {"name": e["name"], "type": e["type"], + "description": e.get("description", ""), "aliases": e.get("aliases", [])} + for e in entities + ] + entities_json = json.dumps(entities_for_prompt, ensure_ascii=False, indent=2) + + # Quelltexte + source_parts = [] + for art in articles: + headline = art.get("headline_de") or art.get("headline") or "" + content = art.get("content_de") or art.get("content_original") or "" + source = art.get("source") or "" + source_parts.append(f"[{source}] {headline}\n{content}") + + for fc in factchecks: + claim = fc.get("claim") or "" + evidence = fc.get("evidence") or "" + source_parts.append(f"[Faktencheck] {claim}\n{evidence}") + + source_texts = "\n\n---\n\n".join(source_parts) + + # Kürzen falls zu lang + if len(source_texts) > 150_000: + logger.info(f"Quelltexte zu lang ({len(source_texts)} Zeichen), kürze") + short_parts = [] + for art in articles: + headline = art.get("headline_de") or art.get("headline") or "" + content = art.get("content_de") or art.get("content_original") or "" + short = content[:500] + "..." if len(content) > 500 else content + short_parts.append(f"[{art.get('source', '')}] {headline}: {short}") + for fc in factchecks: + short_parts.append(f"[Faktencheck] {fc.get('claim', '')} (Status: {fc.get('status', '')})") + source_texts = "\n\n".join(short_parts) + + prompt = RELATIONSHIP_ANALYSIS_PROMPT.format( + entities_json=entities_json, source_texts=source_texts, + ) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=None) + usage_acc.add(usage) + except Exception as e: + logger.error(f"Opus Beziehungsanalyse fehlgeschlagen: {e}") + return [] + + parsed = _parse_json_response(result_text) + if not parsed: + logger.warning("Kein gültiges JSON von Opus") + return [] + + # Korrekturen anwenden + corrections = parsed.get("corrections", []) + if corrections and isinstance(corrections, list): + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "correction", + "progress": 0, + }) + await _apply_corrections(db, analysis_id, tenant_id, entities, corrections) + + # Beziehungen speichern + relations = parsed.get("relations", []) + if not isinstance(relations, list): + return [] + + name_to_id = _build_entity_name_map(entities) + valid_categories = {"alliance", "conflict", "diplomacy", "economic", "legal", "neutral"} + saved_relations = [] + + for rel in relations: + if not isinstance(rel, dict): + continue + + source_name = (rel.get("source") or "").strip() + target_name = (rel.get("target") or "").strip() + if not source_name or not target_name: + continue + + source_id = name_to_id.get(source_name.lower()) + target_id = name_to_id.get(target_name.lower()) + if not source_id or not target_id or source_id == target_id: + continue + + category = (rel.get("category") or "neutral").lower().strip() + if category not in valid_categories: + category = "neutral" + + weight = rel.get("weight", 3) + try: + weight = max(1, min(5, int(weight))) + except (ValueError, TypeError): + weight = 3 + + status = (rel.get("status") or "active").lower().strip() + if status not in {"active", "historical", "emerging"}: + status = "active" + + evidence = rel.get("evidence", []) + if not isinstance(evidence, list): + evidence = [] + + try: + cursor = await db.execute( + """INSERT INTO network_relations + (network_analysis_id, source_entity_id, target_entity_id, + category, label, description, weight, status, evidence, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + analysis_id, source_id, target_id, category, + rel.get("label", ""), rel.get("description", ""), + weight, status, + json.dumps(evidence, ensure_ascii=False), + tenant_id, + ), + ) + saved_relations.append({"id": cursor.lastrowid}) + except Exception as e: + logger.warning(f"Beziehung speichern fehlgeschlagen: {e}") + + await db.commit() + logger.info(f"Phase 2 abgeschlossen: {len(corrections)} Korrekturen, {len(saved_relations)} Beziehungen") + + await _broadcast(ws_manager, "network_status", { + "analysis_id": analysis_id, + "phase": "relationship_extraction", + "progress": 100, + }) + + return saved_relations + + +def _build_entity_name_map(entities: list[dict]) -> dict[str, int]: + """Mapping: normalisierter Name/Alias -> DB-ID.""" + name_to_id: dict[str, int] = {} + for ent in entities: + db_id = ent.get("db_id") + if not db_id: + continue + name_to_id[ent["name"].lower()] = db_id + name_to_id[ent["name_normalized"]] = db_id + for alias in ent.get("aliases", []): + if alias and alias.strip(): + name_to_id[alias.strip().lower()] = db_id + return name_to_id + + +async def _apply_corrections(db, analysis_id, tenant_id, entities, corrections): + """Wendet Opus-Korrekturen auf Entitäten an.""" + for corr in corrections: + if not isinstance(corr, dict): + continue + corr_type = corr.get("type", "") + + try: + if corr_type == "name_fix": + entity_name = (corr.get("entity_name") or "").strip() + corrected_name = (corr.get("corrected_name") or "").strip() + if not entity_name or not corrected_name: + continue + + for ent in entities: + if ent["name"].lower() == entity_name.lower() or \ + ent["name_normalized"] == entity_name.lower(): + old_name = ent["name"] + ent["name"] = corrected_name + ent["name_normalized"] = corrected_name.lower().strip() + ent.setdefault("aliases", []) + if old_name not in ent["aliases"]: + ent["aliases"].append(old_name) + + if ent.get("db_id"): + await db.execute( + """UPDATE network_entities + SET name = ?, name_normalized = ?, aliases = ?, corrected_by_opus = 1 + WHERE id = ?""", + (corrected_name, corrected_name.lower().strip(), + json.dumps(ent["aliases"], ensure_ascii=False), ent["db_id"]), + ) + break + + elif corr_type == "merge": + keep_name = (corr.get("entity_name") or "").strip() + merge_name = (corr.get("merge_with") or "").strip() + if not keep_name or not merge_name: + continue + + keep_ent = merge_ent = None + for ent in entities: + nl = ent["name"].lower() + nn = ent["name_normalized"] + if nl == keep_name.lower() or nn == keep_name.lower(): + keep_ent = ent + elif nl == merge_name.lower() or nn == merge_name.lower(): + merge_ent = ent + + if keep_ent and merge_ent and keep_ent.get("db_id") and merge_ent.get("db_id"): + aliases = set(keep_ent.get("aliases", [])) + aliases.add(merge_ent["name"]) + for a in merge_ent.get("aliases", []): + if a and a.strip(): + aliases.add(a.strip()) + keep_ent["aliases"] = list(aliases) + keep_ent["mention_count"] = keep_ent.get("mention_count", 0) + merge_ent.get("mention_count", 0) + + # Mentions übertragen + await db.execute( + "UPDATE network_entity_mentions SET entity_id = ? WHERE entity_id = ?", + (keep_ent["db_id"], merge_ent["db_id"]), + ) + await db.execute( + """UPDATE network_entities SET aliases = ?, mention_count = ?, corrected_by_opus = 1 + WHERE id = ?""", + (json.dumps(keep_ent["aliases"], ensure_ascii=False), + keep_ent["mention_count"], keep_ent["db_id"]), + ) + await db.execute("DELETE FROM network_entities WHERE id = ?", (merge_ent["db_id"],)) + entities.remove(merge_ent) + + elif corr_type == "add": + entity_name = (corr.get("entity_name") or "").strip() + entity_type = (corr.get("entity_type") or "organisation").lower().strip() + description = corr.get("description", "") + if not entity_name: + continue + + valid_types = {"person", "organisation", "location", "event", "military"} + if entity_type not in valid_types: + entity_type = "organisation" + + name_norm = entity_name.lower().strip() + if any(e["name_normalized"] == name_norm and e["type"] == entity_type for e in entities): + continue + + cursor = await db.execute( + """INSERT OR IGNORE INTO network_entities + (network_analysis_id, name, name_normalized, entity_type, + description, aliases, mention_count, corrected_by_opus, tenant_id) + VALUES (?, ?, ?, ?, ?, '[]', 1, 1, ?)""", + (analysis_id, entity_name, name_norm, entity_type, description, tenant_id), + ) + if cursor.lastrowid: + entities.append({ + "name": entity_name, "name_normalized": name_norm, + "type": entity_type, "description": description, + "aliases": [], "mention_count": 1, "db_id": cursor.lastrowid, + }) + + except Exception as e: + logger.warning(f"Korrektur fehlgeschlagen ({corr_type}): {e}") + + await db.commit() + + +# --------------------------------------------------------------------------- +# Phase 3: Finalisierung +# --------------------------------------------------------------------------- + +async def _phase3_finalize( + db, analysis_id, tenant_id, entity_count, relation_count, + article_ids, factcheck_ids, article_ts, factcheck_ts, + usage_acc, ws_manager=None, +): + """Finalisiert: Zähler, Hash, Log, Status.""" + data_hash = _compute_data_hash(article_ids, factcheck_ids, article_ts, factcheck_ts) + now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") + + await db.execute( + """UPDATE network_analyses + SET entity_count = ?, relation_count = ?, status = 'ready', + last_generated_at = ?, data_hash = ? + WHERE id = ?""", + (entity_count, relation_count, now, data_hash, analysis_id), + ) + + await db.execute( + """INSERT INTO network_generation_log + (network_analysis_id, completed_at, status, input_tokens, output_tokens, + cache_creation_tokens, cache_read_tokens, total_cost_usd, api_calls, + entity_count, relation_count, tenant_id) + VALUES (?, ?, 'completed', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (analysis_id, now, usage_acc.input_tokens, usage_acc.output_tokens, + usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, + usage_acc.total_cost_usd, usage_acc.call_count, + entity_count, relation_count, tenant_id), + ) + + await db.commit() + + logger.info(f"Analyse {analysis_id} finalisiert: {entity_count} Entitäten, " + f"{relation_count} Beziehungen, ${usage_acc.total_cost_usd:.4f}") + + await _broadcast(ws_manager, "network_complete", { + "analysis_id": analysis_id, + "entity_count": entity_count, + "relation_count": relation_count, + "cost_usd": round(usage_acc.total_cost_usd, 4), + }) + + +# --------------------------------------------------------------------------- +# Hauptfunktion +# --------------------------------------------------------------------------- + +async def extract_and_relate_entities(analysis_id: int, tenant_id: int, ws_manager=None): + """Hauptfunktion: Entity-Extraktion + Beziehungsanalyse. + + Phase 1: Haiku extrahiert Entitäten aus Artikeln + Phase 2: Opus analysiert Beziehungen und korrigiert Haiku-Fehler + Phase 3: Finalisierung (Zähler, Hash, Log) + """ + from database import get_db + + db = await get_db() + usage_acc = UsageAccumulator() + + try: + if not await _check_analysis_exists(db, analysis_id): + logger.warning(f"Analyse {analysis_id} existiert nicht") + return + + await db.execute( + "UPDATE network_analyses SET status = 'generating' WHERE id = ?", + (analysis_id,), + ) + await db.commit() + + # Incident-IDs laden + cursor = await db.execute( + "SELECT incident_id FROM network_analysis_incidents WHERE network_analysis_id = ?", + (analysis_id,), + ) + incident_ids = [row["incident_id"] for row in await cursor.fetchall()] + + if not incident_ids: + logger.warning(f"Analyse {analysis_id}: Keine Lagen verknüpft") + await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) + await db.commit() + await _broadcast(ws_manager, "network_error", { + "analysis_id": analysis_id, "error": "Keine Lagen verknüpft", + }) + return + + # Artikel laden + placeholders = ",".join("?" * len(incident_ids)) + cursor = await db.execute( + f"""SELECT id, incident_id, headline, headline_de, source, source_url, + content_original, content_de, collected_at + FROM articles WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + article_rows = await cursor.fetchall() + articles = [] + article_ids = [] + article_ts = [] + for r in article_rows: + articles.append({ + "id": r["id"], "incident_id": r["incident_id"], + "headline": r["headline"], "headline_de": r["headline_de"], + "source": r["source"], "source_url": r["source_url"], + "content_original": r["content_original"], "content_de": r["content_de"], + }) + article_ids.append(r["id"]) + article_ts.append(r["collected_at"] or "") + + # Faktenchecks laden + cursor = await db.execute( + f"""SELECT id, incident_id, claim, status, evidence, checked_at + FROM fact_checks WHERE incident_id IN ({placeholders})""", + incident_ids, + ) + fc_rows = await cursor.fetchall() + factchecks = [] + factcheck_ids = [] + factcheck_ts = [] + for r in fc_rows: + factchecks.append({ + "id": r["id"], "incident_id": r["incident_id"], + "claim": r["claim"], "status": r["status"], "evidence": r["evidence"], + }) + factcheck_ids.append(r["id"]) + factcheck_ts.append(r["checked_at"] or "") + + logger.info(f"Analyse {analysis_id}: {len(articles)} Artikel, " + f"{len(factchecks)} Faktenchecks aus {len(incident_ids)} Lagen") + + # Phase 1 + if not await _check_analysis_exists(db, analysis_id): + return + + entities = await _phase1_extract_entities( + db, analysis_id, tenant_id, articles, factchecks, usage_acc, ws_manager, + ) + + # Phase 2 + if not await _check_analysis_exists(db, analysis_id): + return + + relations = await _phase2_analyze_relationships( + db, analysis_id, tenant_id, entities, articles, factchecks, usage_acc, ws_manager, + ) + + # Phase 3 + if not await _check_analysis_exists(db, analysis_id): + return + + await _phase3_finalize( + db, analysis_id, tenant_id, + entity_count=len(entities), relation_count=len(relations), + article_ids=article_ids, factcheck_ids=factcheck_ids, + article_ts=article_ts, factcheck_ts=factcheck_ts, + usage_acc=usage_acc, ws_manager=ws_manager, + ) + + except Exception as e: + logger.error(f"Entity-Extraktion fehlgeschlagen (Analyse {analysis_id}): {e}", exc_info=True) + try: + await db.execute("UPDATE network_analyses SET status = 'error' WHERE id = ?", (analysis_id,)) + await db.commit() + except Exception: + pass + await _broadcast(ws_manager, "network_error", { + "analysis_id": analysis_id, "error": str(e), + }) + finally: + await db.close() diff --git a/src/database.py b/src/database.py index 9944eb7..b9d7a01 100644 --- a/src/database.py +++ b/src/database.py @@ -1,603 +1,685 @@ -"""SQLite Datenbank-Setup und Zugriff.""" -import aiosqlite -import logging -import os -from config import DB_PATH, DATA_DIR - -logger = logging.getLogger("osint.database") - -SCHEMA = """ -CREATE TABLE IF NOT EXISTS organizations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - slug TEXT UNIQUE NOT NULL, - is_active INTEGER DEFAULT 1, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - -CREATE TABLE IF NOT EXISTS licenses ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - organization_id INTEGER NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, - license_type TEXT NOT NULL DEFAULT 'trial', - max_users INTEGER NOT NULL DEFAULT 5, - valid_from TIMESTAMP NOT NULL, - valid_until TIMESTAMP, - status TEXT NOT NULL DEFAULT 'active', - notes TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - -CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - email TEXT UNIQUE NOT NULL, - username TEXT NOT NULL, - password_hash TEXT, - organization_id INTEGER NOT NULL REFERENCES organizations(id), - role TEXT NOT NULL DEFAULT 'member', - is_active INTEGER DEFAULT 1, - last_login_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); -CREATE TABLE IF NOT EXISTS magic_links ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - email TEXT NOT NULL, - token TEXT UNIQUE NOT NULL, - code TEXT NOT NULL, - purpose TEXT NOT NULL DEFAULT 'login', - user_id INTEGER REFERENCES users(id), - is_used INTEGER DEFAULT 0, - expires_at TIMESTAMP NOT NULL, - ip_address TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - --- Hinweis: portal_admins wird von der Verwaltungs-App (Admin-Portal) genutzt, die dieselbe DB teilt. -CREATE TABLE IF NOT EXISTS portal_admins ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - username TEXT UNIQUE NOT NULL, - password_hash TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - -CREATE TABLE IF NOT EXISTS incidents ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - title TEXT NOT NULL, - description TEXT, - status TEXT DEFAULT 'active', - type TEXT DEFAULT 'adhoc', - refresh_mode TEXT DEFAULT 'manual', - refresh_interval INTEGER DEFAULT 15, - retention_days INTEGER DEFAULT 0, - visibility TEXT DEFAULT 'public', - summary TEXT, - sources_json TEXT, - international_sources INTEGER DEFAULT 1, - category_labels TEXT, - tenant_id INTEGER REFERENCES organizations(id), - created_by INTEGER REFERENCES users(id), - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); -CREATE TABLE IF NOT EXISTS articles ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - headline TEXT NOT NULL, - headline_de TEXT, - source TEXT NOT NULL, - source_url TEXT, - content_original TEXT, - content_de TEXT, - language TEXT DEFAULT 'de', - published_at TIMESTAMP, - collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - verification_status TEXT DEFAULT 'unverified', - tenant_id INTEGER REFERENCES organizations(id) -); -CREATE TABLE IF NOT EXISTS fact_checks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - claim TEXT NOT NULL, - status TEXT DEFAULT 'developing', - sources_count INTEGER DEFAULT 0, - evidence TEXT, - is_notification INTEGER DEFAULT 0, - checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) -); - -CREATE TABLE IF NOT EXISTS refresh_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - completed_at TIMESTAMP, - articles_found INTEGER DEFAULT 0, - status TEXT DEFAULT 'running', - tenant_id INTEGER REFERENCES organizations(id) -); - -CREATE TABLE IF NOT EXISTS incident_snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - summary TEXT, - sources_json TEXT, - article_count INTEGER DEFAULT 0, - fact_check_count INTEGER DEFAULT 0, - refresh_log_id INTEGER REFERENCES refresh_log(id), - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) -); - -CREATE TABLE IF NOT EXISTS sources ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - url TEXT, - domain TEXT, - source_type TEXT NOT NULL DEFAULT 'rss_feed', - category TEXT NOT NULL DEFAULT 'sonstige', - status TEXT NOT NULL DEFAULT 'active', - notes TEXT, - added_by TEXT, - article_count INTEGER DEFAULT 0, - last_seen_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) -); - -CREATE TABLE IF NOT EXISTS notifications ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - type TEXT NOT NULL DEFAULT 'refresh_summary', - title TEXT NOT NULL, - text TEXT NOT NULL, - icon TEXT DEFAULT 'info', - is_read INTEGER DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) -); -CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read); - -CREATE TABLE IF NOT EXISTS incident_subscriptions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE, - notify_email_summary INTEGER DEFAULT 0, - notify_email_new_articles INTEGER DEFAULT 0, - notify_email_status_change INTEGER DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(user_id, incident_id) -); - -CREATE TABLE IF NOT EXISTS article_locations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - location_name TEXT NOT NULL, - location_name_normalized TEXT, - country_code TEXT, - latitude REAL NOT NULL, - longitude REAL NOT NULL, - confidence REAL DEFAULT 0.0, - source_text TEXT, - tenant_id INTEGER REFERENCES organizations(id) -); -CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id); -CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id); - - -CREATE TABLE IF NOT EXISTS source_health_checks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, - check_type TEXT NOT NULL, - status TEXT NOT NULL, - message TEXT, - details TEXT, - checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); -CREATE INDEX IF NOT EXISTS idx_source_health_source ON source_health_checks(source_id); - -CREATE TABLE IF NOT EXISTS source_suggestions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - suggestion_type TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, - suggested_data TEXT, - priority TEXT DEFAULT 'medium', - status TEXT DEFAULT 'pending', - reviewed_at TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); -CREATE TABLE IF NOT EXISTS user_excluded_domains ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - domain TEXT NOT NULL, - notes TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(user_id, domain) -); -""" - - -async def get_db() -> aiosqlite.Connection: - """Erstellt eine neue Datenbankverbindung.""" - os.makedirs(DATA_DIR, exist_ok=True) - db = await aiosqlite.connect(DB_PATH) - db.row_factory = aiosqlite.Row - await db.execute("PRAGMA journal_mode=WAL") - await db.execute("PRAGMA foreign_keys=ON") - await db.execute("PRAGMA busy_timeout=5000") - return db - - -async def init_db(): - """Initialisiert die Datenbank mit dem Schema.""" - db = await get_db() - try: - await db.executescript(SCHEMA) - await db.commit() - - # --- Migrationen fuer bestehende Datenbanken --- - - # Incidents-Spalten pruefen - cursor = await db.execute("PRAGMA table_info(incidents)") - columns = [row[1] for row in await cursor.fetchall()] - - if "type" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN type TEXT DEFAULT 'adhoc'") - await db.commit() - - if "sources_json" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN sources_json TEXT") - await db.commit() - - if "international_sources" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN international_sources INTEGER DEFAULT 1") - await db.commit() - - if "visibility" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN visibility TEXT DEFAULT 'public'") - await db.commit() - - if "include_telegram" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN include_telegram INTEGER DEFAULT 0") - await db.commit() - logger.info("Migration: include_telegram zu incidents hinzugefuegt") - - if "telegram_categories" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN telegram_categories TEXT DEFAULT NULL") - await db.commit() - logger.info("Migration: telegram_categories zu incidents hinzugefuegt") - - if "category_labels" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN category_labels TEXT") - await db.commit() - logger.info("Migration: category_labels zu incidents hinzugefuegt") - - if "tenant_id" not in columns: - await db.execute("ALTER TABLE incidents ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - logger.info("Migration: tenant_id zu incidents hinzugefuegt") - - # Migration: Token-Spalten fuer refresh_log - cursor = await db.execute("PRAGMA table_info(refresh_log)") - rl_columns = [row[1] for row in await cursor.fetchall()] - if "input_tokens" not in rl_columns: - await db.execute("ALTER TABLE refresh_log ADD COLUMN input_tokens INTEGER DEFAULT 0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN output_tokens INTEGER DEFAULT 0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_creation_tokens INTEGER DEFAULT 0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_read_tokens INTEGER DEFAULT 0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN total_cost_usd REAL DEFAULT 0.0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN api_calls INTEGER DEFAULT 0") - await db.commit() - - if "trigger_type" not in rl_columns: - await db.execute("ALTER TABLE refresh_log ADD COLUMN trigger_type TEXT DEFAULT 'manual'") - await db.commit() - - if "retry_count" not in rl_columns: - await db.execute("ALTER TABLE refresh_log ADD COLUMN retry_count INTEGER DEFAULT 0") - await db.execute("ALTER TABLE refresh_log ADD COLUMN error_message TEXT") - await db.commit() - - if "tenant_id" not in rl_columns: - await db.execute("ALTER TABLE refresh_log ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Migration: notifications-Tabelle (fuer bestehende DBs) - cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notifications'") - if not await cursor.fetchone(): - await db.executescript(""" - CREATE TABLE IF NOT EXISTS notifications ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - type TEXT NOT NULL DEFAULT 'refresh_summary', - title TEXT NOT NULL, - text TEXT NOT NULL, - icon TEXT DEFAULT 'info', - is_read INTEGER DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) - ); - CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read); - """) - await db.commit() - - # Migration: incident_subscriptions-Tabelle (fuer bestehende DBs) - cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='incident_subscriptions'") - if not await cursor.fetchone(): - await db.executescript(""" - CREATE TABLE IF NOT EXISTS incident_subscriptions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE, - notify_email_summary INTEGER DEFAULT 0, - notify_email_new_articles INTEGER DEFAULT 0, - notify_email_status_change INTEGER DEFAULT 0, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(user_id, incident_id) - ); - """) - await db.commit() - logger.info("Migration: incident_subscriptions-Tabelle erstellt") - else: - # Migration: Spalte umbenennen contradiction -> new_articles - cursor = await db.execute("PRAGMA table_info(incident_subscriptions)") - sub_columns = [row[1] for row in await cursor.fetchall()] - if "notify_email_contradiction" in sub_columns: - await db.execute("ALTER TABLE incident_subscriptions RENAME COLUMN notify_email_contradiction TO notify_email_new_articles") - await db.commit() - logger.info("Migration: notify_email_contradiction -> notify_email_new_articles umbenannt") - - # Migration: role-Spalte fuer users - cursor = await db.execute("PRAGMA table_info(users)") - user_columns = [row[1] for row in await cursor.fetchall()] - if "role" not in user_columns: - await db.execute("ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'member'") - await db.execute("UPDATE users SET role = 'org_admin'") - await db.commit() - logger.info("Migration: role-Spalte zu users hinzugefuegt") - - # Migration: email, organization_id, is_active, last_login_at fuer users - if "email" not in user_columns: - await db.execute("ALTER TABLE users ADD COLUMN email TEXT") - await db.commit() - logger.info("Migration: email zu users hinzugefuegt") - - if "organization_id" not in user_columns: - await db.execute("ALTER TABLE users ADD COLUMN organization_id INTEGER REFERENCES organizations(id)") - await db.commit() - logger.info("Migration: organization_id zu users hinzugefuegt") - - # Index erst nach Spalten-Migration erstellen - try: - await db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_org_username ON users(organization_id, username)") - await db.commit() - except Exception: - pass # Index existiert bereits oder Spalte fehlt noch - - if "is_active" not in user_columns: - await db.execute("ALTER TABLE users ADD COLUMN is_active INTEGER DEFAULT 1") - await db.commit() - - if "last_login_at" not in user_columns: - await db.execute("ALTER TABLE users ADD COLUMN last_login_at TIMESTAMP") - await db.commit() - - # Migration: tenant_id fuer articles - cursor = await db.execute("PRAGMA table_info(articles)") - art_columns = [row[1] for row in await cursor.fetchall()] - if "tenant_id" not in art_columns: - await db.execute("ALTER TABLE articles ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Migration: tenant_id fuer fact_checks - cursor = await db.execute("PRAGMA table_info(fact_checks)") - fc_columns = [row[1] for row in await cursor.fetchall()] - if "tenant_id" not in fc_columns: - await db.execute("ALTER TABLE fact_checks ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Migration: status_history fuer fact_checks (Faktencheck-Verlauf) - if "status_history" not in fc_columns: - await db.execute("ALTER TABLE fact_checks ADD COLUMN status_history TEXT DEFAULT '[]'") - # Bestehende Eintraege initialisieren - cursor2 = await db.execute("SELECT id, status, checked_at FROM fact_checks") - for row2 in await cursor2.fetchall(): - import json as _json - initial_history = _json.dumps([{"status": row2[1], "at": str(row2[2])}]) - await db.execute("UPDATE fact_checks SET status_history = ? WHERE id = ?", (initial_history, row2[0])) - await db.commit() - logger.info("Migration: status_history zu fact_checks hinzugefuegt") - - # Migration: category fuer article_locations (Marker-Klassifizierung) - cursor = await db.execute("PRAGMA table_info(article_locations)") - al_columns = [row[1] for row in await cursor.fetchall()] - if "category" not in al_columns: - await db.execute("ALTER TABLE article_locations ADD COLUMN category TEXT DEFAULT 'mentioned'") - await db.commit() - logger.info("Migration: category zu article_locations hinzugefuegt") - - # Migration: Alte Kategorie-Werte auf neue Keys umbenennen - try: - await db.execute( - "UPDATE article_locations SET category = 'primary' WHERE category = 'target'" - ) - await db.execute( - "UPDATE article_locations SET category = 'secondary' WHERE category IN ('response', 'retaliation')" - ) - await db.execute( - "UPDATE article_locations SET category = 'tertiary' WHERE category IN ('actor', 'context')" - ) - changed = db.total_changes - await db.commit() - if changed > 0: - logger.info("Migration: article_locations Kategorien umbenannt (target->primary, response/retaliation->secondary, actor->tertiary)") - except Exception: - pass # Bereits migriert oder keine Daten - - # Migration: tenant_id fuer incident_snapshots - cursor = await db.execute("PRAGMA table_info(incident_snapshots)") - snap_columns2 = [row[1] for row in await cursor.fetchall()] - if "tenant_id" not in snap_columns2: - await db.execute("ALTER TABLE incident_snapshots ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Migration: tenant_id fuer sources - cursor = await db.execute("PRAGMA table_info(sources)") - src_columns = [row[1] for row in await cursor.fetchall()] - if "tenant_id" not in src_columns: - await db.execute("ALTER TABLE sources ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Migration: tenant_id fuer notifications - cursor = await db.execute("PRAGMA table_info(notifications)") - notif_columns = [row[1] for row in await cursor.fetchall()] - if "tenant_id" not in notif_columns: - await db.execute("ALTER TABLE notifications ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") - await db.commit() - - # Indexes erstellen (nach Spalten-Migrationen) - for idx_sql in [ - "CREATE INDEX IF NOT EXISTS idx_incidents_tenant_status ON incidents(tenant_id, status)", - "CREATE INDEX IF NOT EXISTS idx_articles_tenant_incident ON articles(tenant_id, incident_id)", - ]: - try: - await db.execute(idx_sql) - await db.commit() - except Exception: - pass - - # Migration: article_locations-Tabelle (fuer bestehende DBs) - cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='article_locations'") - if not await cursor.fetchone(): - await db.executescript(""" - CREATE TABLE IF NOT EXISTS article_locations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE, - incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, - location_name TEXT NOT NULL, - location_name_normalized TEXT, - country_code TEXT, - latitude REAL NOT NULL, - longitude REAL NOT NULL, - confidence REAL DEFAULT 0.0, - source_text TEXT, - tenant_id INTEGER REFERENCES organizations(id) - ); - CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id); - CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id); - """) - await db.commit() - logger.info("Migration: article_locations-Tabelle erstellt") - - # Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min) - await db.execute( - """UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart', - completed_at = CURRENT_TIMESTAMP - WHERE status = 'running' - AND started_at < datetime('now', '-15 minutes')""" - ) - await db.commit() - - # Sources-Tabelle seeden (nur wenn leer) - cursor = await db.execute("SELECT COUNT(*) as cnt FROM sources") - row = await cursor.fetchone() - if row["cnt"] == 0: - await _seed_sources(db) - - finally: - await db.close() - - -async def _seed_sources(db: aiosqlite.Connection): - """Befuellt die sources-Tabelle aus der config.py-Konfiguration.""" - from config import RSS_FEEDS, EXCLUDED_SOURCES - - category_map = { - "tagesschau": "oeffentlich-rechtlich", - "ZDF heute": "oeffentlich-rechtlich", - "Deutsche Welle": "oeffentlich-rechtlich", - "Spiegel": "qualitaetszeitung", - "Zeit": "qualitaetszeitung", - "FAZ": "qualitaetszeitung", - "Süddeutsche": "qualitaetszeitung", - "NZZ": "qualitaetszeitung", - "Reuters": "nachrichtenagentur", - "AP News": "nachrichtenagentur", - "BBC World": "international", - "Al Jazeera": "international", - "France24": "international", - "BMI": "behoerde", - "Europol": "behoerde", - } - - for _rss_category, feeds in RSS_FEEDS.items(): - for feed in feeds: - name = feed["name"] - url = feed["url"] - try: - from urllib.parse import urlparse - domain = urlparse(url).netloc.lower().replace("www.", "") - except Exception: - domain = "" - - category = category_map.get(name, "sonstige") - await db.execute( - """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) - VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""", - (name, url, domain, category), - ) - - for excl in EXCLUDED_SOURCES: - await db.execute( - """INSERT INTO sources (name, domain, source_type, category, status, added_by, tenant_id) - VALUES (?, ?, 'excluded', 'sonstige', 'active', 'system', NULL)""", - (excl, excl), - ) - - await db.commit() - await refresh_source_counts(db) - - logger.info(f"Sources-Tabelle geseeded: {len(RSS_FEEDS.get('deutsch', []))+len(RSS_FEEDS.get('international', []))+len(RSS_FEEDS.get('behoerden', []))} RSS-Feeds, {len(EXCLUDED_SOURCES)} ausgeschlossene Quellen") - - -async def refresh_source_counts(db: aiosqlite.Connection): - """Berechnet Artikelzaehler und last_seen_at fuer alle Quellen neu.""" - cursor = await db.execute("SELECT id, name, domain FROM sources WHERE source_type != 'excluded'") - sources = await cursor.fetchall() - - for source in sources: - sid = source["id"] - name = source["name"] - domain = source["domain"] or "" - - if domain: - cursor = await db.execute( - """SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen - FROM articles WHERE source = ? OR source_url LIKE ?""", - (name, f"%{domain}%"), - ) - else: - cursor = await db.execute( - "SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen FROM articles WHERE source = ?", - (name,), - ) - row = await cursor.fetchone() - await db.execute( - "UPDATE sources SET article_count = ?, last_seen_at = ? WHERE id = ?", - (row["cnt"], row["last_seen"], sid), - ) - - await db.commit() - - -async def db_dependency(): - """FastAPI Dependency fuer Datenbankverbindungen.""" - db = await get_db() - try: - yield db - finally: - await db.close() +"""SQLite Datenbank-Setup und Zugriff.""" +import aiosqlite +import logging +import os +from config import DB_PATH, DATA_DIR + +logger = logging.getLogger("osint.database") + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS organizations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + slug TEXT UNIQUE NOT NULL, + is_active INTEGER DEFAULT 1, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS licenses ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + organization_id INTEGER NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + license_type TEXT NOT NULL DEFAULT 'trial', + max_users INTEGER NOT NULL DEFAULT 5, + valid_from TIMESTAMP NOT NULL, + valid_until TIMESTAMP, + status TEXT NOT NULL DEFAULT 'active', + notes TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + email TEXT UNIQUE NOT NULL, + username TEXT NOT NULL, + password_hash TEXT, + organization_id INTEGER NOT NULL REFERENCES organizations(id), + role TEXT NOT NULL DEFAULT 'member', + is_active INTEGER DEFAULT 1, + last_login_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE TABLE IF NOT EXISTS magic_links ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + email TEXT NOT NULL, + token TEXT UNIQUE NOT NULL, + code TEXT NOT NULL, + purpose TEXT NOT NULL DEFAULT 'login', + user_id INTEGER REFERENCES users(id), + is_used INTEGER DEFAULT 0, + expires_at TIMESTAMP NOT NULL, + ip_address TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- Hinweis: portal_admins wird von der Verwaltungs-App (Admin-Portal) genutzt, die dieselbe DB teilt. +CREATE TABLE IF NOT EXISTS portal_admins ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS incidents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + description TEXT, + status TEXT DEFAULT 'active', + type TEXT DEFAULT 'adhoc', + refresh_mode TEXT DEFAULT 'manual', + refresh_interval INTEGER DEFAULT 15, + retention_days INTEGER DEFAULT 0, + visibility TEXT DEFAULT 'public', + summary TEXT, + sources_json TEXT, + international_sources INTEGER DEFAULT 1, + category_labels TEXT, + tenant_id INTEGER REFERENCES organizations(id), + created_by INTEGER REFERENCES users(id), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE TABLE IF NOT EXISTS articles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + headline TEXT NOT NULL, + headline_de TEXT, + source TEXT NOT NULL, + source_url TEXT, + content_original TEXT, + content_de TEXT, + language TEXT DEFAULT 'de', + published_at TIMESTAMP, + collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + verification_status TEXT DEFAULT 'unverified', + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE TABLE IF NOT EXISTS fact_checks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + claim TEXT NOT NULL, + status TEXT DEFAULT 'developing', + sources_count INTEGER DEFAULT 0, + evidence TEXT, + is_notification INTEGER DEFAULT 0, + checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id) +); + +CREATE TABLE IF NOT EXISTS refresh_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + completed_at TIMESTAMP, + articles_found INTEGER DEFAULT 0, + status TEXT DEFAULT 'running', + tenant_id INTEGER REFERENCES organizations(id) +); + +CREATE TABLE IF NOT EXISTS incident_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + summary TEXT, + sources_json TEXT, + article_count INTEGER DEFAULT 0, + fact_check_count INTEGER DEFAULT 0, + refresh_log_id INTEGER REFERENCES refresh_log(id), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id) +); + +CREATE TABLE IF NOT EXISTS sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + url TEXT, + domain TEXT, + source_type TEXT NOT NULL DEFAULT 'rss_feed', + category TEXT NOT NULL DEFAULT 'sonstige', + status TEXT NOT NULL DEFAULT 'active', + notes TEXT, + added_by TEXT, + article_count INTEGER DEFAULT 0, + last_seen_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id) +); + +CREATE TABLE IF NOT EXISTS notifications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + type TEXT NOT NULL DEFAULT 'refresh_summary', + title TEXT NOT NULL, + text TEXT NOT NULL, + icon TEXT DEFAULT 'info', + is_read INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read); + +CREATE TABLE IF NOT EXISTS incident_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE, + notify_email_summary INTEGER DEFAULT 0, + notify_email_new_articles INTEGER DEFAULT 0, + notify_email_status_change INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, incident_id) +); + +CREATE TABLE IF NOT EXISTS article_locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + location_name TEXT NOT NULL, + location_name_normalized TEXT, + country_code TEXT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + confidence REAL DEFAULT 0.0, + source_text TEXT, + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id); +CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id); + + +CREATE TABLE IF NOT EXISTS source_health_checks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + check_type TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + details TEXT, + checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX IF NOT EXISTS idx_source_health_source ON source_health_checks(source_id); + +CREATE TABLE IF NOT EXISTS source_suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + suggestion_type TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, + suggested_data TEXT, + priority TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + reviewed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE TABLE IF NOT EXISTS user_excluded_domains ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + domain TEXT NOT NULL, + notes TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, domain) +); + +CREATE TABLE IF NOT EXISTS network_analyses ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + status TEXT DEFAULT 'pending', + entity_count INTEGER DEFAULT 0, + relation_count INTEGER DEFAULT 0, + data_hash TEXT, + last_generated_at TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id), + created_by INTEGER REFERENCES users(id), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS network_analysis_incidents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE, + incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE, + UNIQUE(network_analysis_id, incident_id) +); +CREATE INDEX IF NOT EXISTS idx_network_analysis_incidents_analysis ON network_analysis_incidents(network_analysis_id); + +CREATE TABLE IF NOT EXISTS network_entities ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE, + name TEXT NOT NULL, + name_normalized TEXT NOT NULL, + entity_type TEXT NOT NULL, + description TEXT DEFAULT '', + aliases TEXT DEFAULT '[]', + metadata TEXT DEFAULT '{}', + mention_count INTEGER DEFAULT 0, + corrected_by_opus INTEGER DEFAULT 0, + tenant_id INTEGER REFERENCES organizations(id), + UNIQUE(network_analysis_id, name_normalized, entity_type) +); +CREATE INDEX IF NOT EXISTS idx_network_entities_analysis ON network_entities(network_analysis_id); + +CREATE TABLE IF NOT EXISTS network_entity_mentions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE, + article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + source_text TEXT, + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE INDEX IF NOT EXISTS idx_network_entity_mentions_entity ON network_entity_mentions(entity_id); + +CREATE TABLE IF NOT EXISTS network_relations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE, + source_entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE, + target_entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE, + category TEXT NOT NULL, + label TEXT NOT NULL, + description TEXT DEFAULT '', + weight INTEGER DEFAULT 1, + status TEXT DEFAULT '', + evidence TEXT DEFAULT '[]', + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE INDEX IF NOT EXISTS idx_network_relations_analysis ON network_relations(network_analysis_id); +CREATE INDEX IF NOT EXISTS idx_network_relations_source ON network_relations(source_entity_id); +CREATE INDEX IF NOT EXISTS idx_network_relations_target ON network_relations(target_entity_id); + +CREATE TABLE IF NOT EXISTS network_generation_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE, + started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + completed_at TIMESTAMP, + status TEXT DEFAULT 'running', + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + cache_creation_tokens INTEGER DEFAULT 0, + cache_read_tokens INTEGER DEFAULT 0, + total_cost_usd REAL DEFAULT 0.0, + api_calls INTEGER DEFAULT 0, + entity_count INTEGER DEFAULT 0, + relation_count INTEGER DEFAULT 0, + error_message TEXT, + tenant_id INTEGER REFERENCES organizations(id) +); +""" + + +async def get_db() -> aiosqlite.Connection: + """Erstellt eine neue Datenbankverbindung.""" + os.makedirs(DATA_DIR, exist_ok=True) + db = await aiosqlite.connect(DB_PATH) + db.row_factory = aiosqlite.Row + await db.execute("PRAGMA journal_mode=WAL") + await db.execute("PRAGMA foreign_keys=ON") + await db.execute("PRAGMA busy_timeout=5000") + return db + + +async def init_db(): + """Initialisiert die Datenbank mit dem Schema.""" + db = await get_db() + try: + await db.executescript(SCHEMA) + await db.commit() + + # --- Migrationen fuer bestehende Datenbanken --- + + # Incidents-Spalten pruefen + cursor = await db.execute("PRAGMA table_info(incidents)") + columns = [row[1] for row in await cursor.fetchall()] + + if "type" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN type TEXT DEFAULT 'adhoc'") + await db.commit() + + if "sources_json" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN sources_json TEXT") + await db.commit() + + if "international_sources" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN international_sources INTEGER DEFAULT 1") + await db.commit() + + if "visibility" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN visibility TEXT DEFAULT 'public'") + await db.commit() + + if "include_telegram" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN include_telegram INTEGER DEFAULT 0") + await db.commit() + logger.info("Migration: include_telegram zu incidents hinzugefuegt") + + if "telegram_categories" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN telegram_categories TEXT DEFAULT NULL") + await db.commit() + logger.info("Migration: telegram_categories zu incidents hinzugefuegt") + + if "category_labels" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN category_labels TEXT") + await db.commit() + logger.info("Migration: category_labels zu incidents hinzugefuegt") + + if "tenant_id" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + logger.info("Migration: tenant_id zu incidents hinzugefuegt") + + # Migration: Token-Spalten fuer refresh_log + cursor = await db.execute("PRAGMA table_info(refresh_log)") + rl_columns = [row[1] for row in await cursor.fetchall()] + if "input_tokens" not in rl_columns: + await db.execute("ALTER TABLE refresh_log ADD COLUMN input_tokens INTEGER DEFAULT 0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN output_tokens INTEGER DEFAULT 0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_creation_tokens INTEGER DEFAULT 0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_read_tokens INTEGER DEFAULT 0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN total_cost_usd REAL DEFAULT 0.0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN api_calls INTEGER DEFAULT 0") + await db.commit() + + if "trigger_type" not in rl_columns: + await db.execute("ALTER TABLE refresh_log ADD COLUMN trigger_type TEXT DEFAULT 'manual'") + await db.commit() + + if "retry_count" not in rl_columns: + await db.execute("ALTER TABLE refresh_log ADD COLUMN retry_count INTEGER DEFAULT 0") + await db.execute("ALTER TABLE refresh_log ADD COLUMN error_message TEXT") + await db.commit() + + if "tenant_id" not in rl_columns: + await db.execute("ALTER TABLE refresh_log ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Migration: notifications-Tabelle (fuer bestehende DBs) + cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notifications'") + if not await cursor.fetchone(): + await db.executescript(""" + CREATE TABLE IF NOT EXISTS notifications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + type TEXT NOT NULL DEFAULT 'refresh_summary', + title TEXT NOT NULL, + text TEXT NOT NULL, + icon TEXT DEFAULT 'info', + is_read INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id) + ); + CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read); + """) + await db.commit() + + # Migration: incident_subscriptions-Tabelle (fuer bestehende DBs) + cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='incident_subscriptions'") + if not await cursor.fetchone(): + await db.executescript(""" + CREATE TABLE IF NOT EXISTS incident_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE, + notify_email_summary INTEGER DEFAULT 0, + notify_email_new_articles INTEGER DEFAULT 0, + notify_email_status_change INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, incident_id) + ); + """) + await db.commit() + logger.info("Migration: incident_subscriptions-Tabelle erstellt") + else: + # Migration: Spalte umbenennen contradiction -> new_articles + cursor = await db.execute("PRAGMA table_info(incident_subscriptions)") + sub_columns = [row[1] for row in await cursor.fetchall()] + if "notify_email_contradiction" in sub_columns: + await db.execute("ALTER TABLE incident_subscriptions RENAME COLUMN notify_email_contradiction TO notify_email_new_articles") + await db.commit() + logger.info("Migration: notify_email_contradiction -> notify_email_new_articles umbenannt") + + # Migration: role-Spalte fuer users + cursor = await db.execute("PRAGMA table_info(users)") + user_columns = [row[1] for row in await cursor.fetchall()] + if "role" not in user_columns: + await db.execute("ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'member'") + await db.execute("UPDATE users SET role = 'org_admin'") + await db.commit() + logger.info("Migration: role-Spalte zu users hinzugefuegt") + + # Migration: email, organization_id, is_active, last_login_at fuer users + if "email" not in user_columns: + await db.execute("ALTER TABLE users ADD COLUMN email TEXT") + await db.commit() + logger.info("Migration: email zu users hinzugefuegt") + + if "organization_id" not in user_columns: + await db.execute("ALTER TABLE users ADD COLUMN organization_id INTEGER REFERENCES organizations(id)") + await db.commit() + logger.info("Migration: organization_id zu users hinzugefuegt") + + # Index erst nach Spalten-Migration erstellen + try: + await db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_org_username ON users(organization_id, username)") + await db.commit() + except Exception: + pass # Index existiert bereits oder Spalte fehlt noch + + if "is_active" not in user_columns: + await db.execute("ALTER TABLE users ADD COLUMN is_active INTEGER DEFAULT 1") + await db.commit() + + if "last_login_at" not in user_columns: + await db.execute("ALTER TABLE users ADD COLUMN last_login_at TIMESTAMP") + await db.commit() + + # Migration: tenant_id fuer articles + cursor = await db.execute("PRAGMA table_info(articles)") + art_columns = [row[1] for row in await cursor.fetchall()] + if "tenant_id" not in art_columns: + await db.execute("ALTER TABLE articles ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Migration: tenant_id fuer fact_checks + cursor = await db.execute("PRAGMA table_info(fact_checks)") + fc_columns = [row[1] for row in await cursor.fetchall()] + if "tenant_id" not in fc_columns: + await db.execute("ALTER TABLE fact_checks ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Migration: status_history fuer fact_checks (Faktencheck-Verlauf) + if "status_history" not in fc_columns: + await db.execute("ALTER TABLE fact_checks ADD COLUMN status_history TEXT DEFAULT '[]'") + # Bestehende Eintraege initialisieren + cursor2 = await db.execute("SELECT id, status, checked_at FROM fact_checks") + for row2 in await cursor2.fetchall(): + import json as _json + initial_history = _json.dumps([{"status": row2[1], "at": str(row2[2])}]) + await db.execute("UPDATE fact_checks SET status_history = ? WHERE id = ?", (initial_history, row2[0])) + await db.commit() + logger.info("Migration: status_history zu fact_checks hinzugefuegt") + + # Migration: category fuer article_locations (Marker-Klassifizierung) + cursor = await db.execute("PRAGMA table_info(article_locations)") + al_columns = [row[1] for row in await cursor.fetchall()] + if "category" not in al_columns: + await db.execute("ALTER TABLE article_locations ADD COLUMN category TEXT DEFAULT 'mentioned'") + await db.commit() + logger.info("Migration: category zu article_locations hinzugefuegt") + + # Migration: Alte Kategorie-Werte auf neue Keys umbenennen + try: + await db.execute( + "UPDATE article_locations SET category = 'primary' WHERE category = 'target'" + ) + await db.execute( + "UPDATE article_locations SET category = 'secondary' WHERE category IN ('response', 'retaliation')" + ) + await db.execute( + "UPDATE article_locations SET category = 'tertiary' WHERE category IN ('actor', 'context')" + ) + changed = db.total_changes + await db.commit() + if changed > 0: + logger.info("Migration: article_locations Kategorien umbenannt (target->primary, response/retaliation->secondary, actor->tertiary)") + except Exception: + pass # Bereits migriert oder keine Daten + + # Migration: tenant_id fuer incident_snapshots + cursor = await db.execute("PRAGMA table_info(incident_snapshots)") + snap_columns2 = [row[1] for row in await cursor.fetchall()] + if "tenant_id" not in snap_columns2: + await db.execute("ALTER TABLE incident_snapshots ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Migration: tenant_id fuer sources + cursor = await db.execute("PRAGMA table_info(sources)") + src_columns = [row[1] for row in await cursor.fetchall()] + if "tenant_id" not in src_columns: + await db.execute("ALTER TABLE sources ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Migration: tenant_id fuer notifications + cursor = await db.execute("PRAGMA table_info(notifications)") + notif_columns = [row[1] for row in await cursor.fetchall()] + if "tenant_id" not in notif_columns: + await db.execute("ALTER TABLE notifications ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") + await db.commit() + + # Indexes erstellen (nach Spalten-Migrationen) + for idx_sql in [ + "CREATE INDEX IF NOT EXISTS idx_incidents_tenant_status ON incidents(tenant_id, status)", + "CREATE INDEX IF NOT EXISTS idx_articles_tenant_incident ON articles(tenant_id, incident_id)", + ]: + try: + await db.execute(idx_sql) + await db.commit() + except Exception: + pass + + # Migration: article_locations-Tabelle (fuer bestehende DBs) + cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='article_locations'") + if not await cursor.fetchone(): + await db.executescript(""" + CREATE TABLE IF NOT EXISTS article_locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + location_name TEXT NOT NULL, + location_name_normalized TEXT, + country_code TEXT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + confidence REAL DEFAULT 0.0, + source_text TEXT, + tenant_id INTEGER REFERENCES organizations(id) + ); + CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id); + CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id); + """) + await db.commit() + logger.info("Migration: article_locations-Tabelle erstellt") + + # Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min) + await db.execute( + """UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart', + completed_at = CURRENT_TIMESTAMP + WHERE status = 'running' + AND started_at < datetime('now', '-15 minutes')""" + ) + await db.commit() + + # Sources-Tabelle seeden (nur wenn leer) + cursor = await db.execute("SELECT COUNT(*) as cnt FROM sources") + row = await cursor.fetchone() + if row["cnt"] == 0: + await _seed_sources(db) + + finally: + await db.close() + + +async def _seed_sources(db: aiosqlite.Connection): + """Befuellt die sources-Tabelle aus der config.py-Konfiguration.""" + from config import RSS_FEEDS, EXCLUDED_SOURCES + + category_map = { + "tagesschau": "oeffentlich-rechtlich", + "ZDF heute": "oeffentlich-rechtlich", + "Deutsche Welle": "oeffentlich-rechtlich", + "Spiegel": "qualitaetszeitung", + "Zeit": "qualitaetszeitung", + "FAZ": "qualitaetszeitung", + "Süddeutsche": "qualitaetszeitung", + "NZZ": "qualitaetszeitung", + "Reuters": "nachrichtenagentur", + "AP News": "nachrichtenagentur", + "BBC World": "international", + "Al Jazeera": "international", + "France24": "international", + "BMI": "behoerde", + "Europol": "behoerde", + } + + for _rss_category, feeds in RSS_FEEDS.items(): + for feed in feeds: + name = feed["name"] + url = feed["url"] + try: + from urllib.parse import urlparse + domain = urlparse(url).netloc.lower().replace("www.", "") + except Exception: + domain = "" + + category = category_map.get(name, "sonstige") + await db.execute( + """INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) + VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""", + (name, url, domain, category), + ) + + for excl in EXCLUDED_SOURCES: + await db.execute( + """INSERT INTO sources (name, domain, source_type, category, status, added_by, tenant_id) + VALUES (?, ?, 'excluded', 'sonstige', 'active', 'system', NULL)""", + (excl, excl), + ) + + await db.commit() + await refresh_source_counts(db) + + logger.info(f"Sources-Tabelle geseeded: {len(RSS_FEEDS.get('deutsch', []))+len(RSS_FEEDS.get('international', []))+len(RSS_FEEDS.get('behoerden', []))} RSS-Feeds, {len(EXCLUDED_SOURCES)} ausgeschlossene Quellen") + + +async def refresh_source_counts(db: aiosqlite.Connection): + """Berechnet Artikelzaehler und last_seen_at fuer alle Quellen neu.""" + cursor = await db.execute("SELECT id, name, domain FROM sources WHERE source_type != 'excluded'") + sources = await cursor.fetchall() + + for source in sources: + sid = source["id"] + name = source["name"] + domain = source["domain"] or "" + + if domain: + cursor = await db.execute( + """SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen + FROM articles WHERE source = ? OR source_url LIKE ?""", + (name, f"%{domain}%"), + ) + else: + cursor = await db.execute( + "SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen FROM articles WHERE source = ?", + (name,), + ) + row = await cursor.fetchone() + await db.execute( + "UPDATE sources SET article_count = ?, last_seen_at = ? WHERE id = ?", + (row["cnt"], row["last_seen"], sid), + ) + + await db.commit() + + +async def db_dependency(): + """FastAPI Dependency fuer Datenbankverbindungen.""" + db = await get_db() + try: + yield db + finally: + await db.close() diff --git a/src/main.py b/src/main.py index e21aa9a..503cec7 100644 --- a/src/main.py +++ b/src/main.py @@ -332,6 +332,7 @@ from routers.notifications import router as notifications_router from routers.feedback import router as feedback_router from routers.public_api import router as public_api_router from routers.chat import router as chat_router +from routers.network_analysis import router as network_analysis_router app.include_router(auth_router) app.include_router(incidents_router) @@ -340,6 +341,7 @@ app.include_router(notifications_router) app.include_router(feedback_router) app.include_router(public_api_router) app.include_router(chat_router, prefix="/api/chat") +app.include_router(network_analysis_router) @app.websocket("/api/ws") diff --git a/src/models_network.py b/src/models_network.py new file mode 100644 index 0000000..b03e357 --- /dev/null +++ b/src/models_network.py @@ -0,0 +1,59 @@ +"""Pydantic Models für Netzwerkanalyse Request/Response Schemas.""" +from pydantic import BaseModel, Field +from typing import Optional + + +class NetworkAnalysisCreate(BaseModel): + name: str = Field(min_length=1, max_length=200) + incident_ids: list[int] = Field(min_length=1) + + +class NetworkAnalysisUpdate(BaseModel): + name: Optional[str] = Field(default=None, max_length=200) + incident_ids: Optional[list[int]] = None + + +class NetworkEntityResponse(BaseModel): + id: int + name: str + name_normalized: str + entity_type: str + description: str = "" + aliases: list[str] = [] + mention_count: int = 0 + corrected_by_opus: bool = False + metadata: dict = {} + + +class NetworkRelationResponse(BaseModel): + id: int + source_entity_id: int + target_entity_id: int + category: str + label: str + description: str = "" + weight: int = 1 + status: str = "" + evidence: list[str] = [] + + +class NetworkAnalysisResponse(BaseModel): + id: int + name: str + status: str + entity_count: int = 0 + relation_count: int = 0 + has_update: bool = False + incident_ids: list[int] = [] + incident_titles: list[str] = [] + data_hash: Optional[str] = None + last_generated_at: Optional[str] = None + created_by: int = 0 + created_by_username: str = "" + created_at: str = "" + + +class NetworkGraphResponse(BaseModel): + analysis: NetworkAnalysisResponse + entities: list[NetworkEntityResponse] = [] + relations: list[NetworkRelationResponse] = [] diff --git a/src/routers/network_analysis.py b/src/routers/network_analysis.py new file mode 100644 index 0000000..60a72c9 --- /dev/null +++ b/src/routers/network_analysis.py @@ -0,0 +1,406 @@ +"""Router für Netzwerkanalyse CRUD-Operationen.""" + +import hashlib +import json +import csv +import io +from datetime import datetime +from typing import Optional + +import aiosqlite +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse + +from auth import get_current_user +from database import db_dependency, get_db +from middleware.license_check import require_writable_license +from models_network import ( + NetworkAnalysisCreate, + NetworkAnalysisUpdate, +) +from config import TIMEZONE + +router = APIRouter(prefix="/api/network-analyses", tags=["network-analyses"]) + + +async def _check_analysis_access( + db: aiosqlite.Connection, analysis_id: int, tenant_id: int +) -> dict: + """Analyse laden und Tenant-Zugriff prüfen.""" + cursor = await db.execute( + "SELECT * FROM network_analyses WHERE id = ? AND tenant_id = ?", + (analysis_id, tenant_id), + ) + row = await cursor.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Netzwerkanalyse nicht gefunden") + return dict(row) + + +async def _enrich_analysis(db: aiosqlite.Connection, analysis: dict) -> dict: + """Analyse mit Incident-IDs, Titeln und Ersteller-Name anreichern.""" + analysis_id = analysis["id"] + + cursor = await db.execute( + """SELECT nai.incident_id, i.title + FROM network_analysis_incidents nai + LEFT JOIN incidents i ON i.id = nai.incident_id + WHERE nai.network_analysis_id = ?""", + (analysis_id,), + ) + rows = await cursor.fetchall() + analysis["incident_ids"] = [r["incident_id"] for r in rows] + analysis["incident_titles"] = [r["title"] or "" for r in rows] + + cursor = await db.execute( + "SELECT email FROM users WHERE id = ?", (analysis.get("created_by"),) + ) + user_row = await cursor.fetchone() + analysis["created_by_username"] = user_row["email"] if user_row else "Unbekannt" + analysis["created_at"] = analysis.get("created_at", "") + analysis["has_update"] = False + return analysis + + +async def _calculate_data_hash(db: aiosqlite.Connection, analysis_id: int) -> str: + """SHA-256 über verknüpfte Artikel- und Factcheck-Daten.""" + cursor = await db.execute( + """SELECT DISTINCT a.id, a.collected_at + FROM network_analysis_incidents nai + JOIN articles a ON a.incident_id = nai.incident_id + WHERE nai.network_analysis_id = ? + ORDER BY a.id""", + (analysis_id,), + ) + article_rows = await cursor.fetchall() + + cursor = await db.execute( + """SELECT DISTINCT fc.id, fc.checked_at + FROM network_analysis_incidents nai + JOIN fact_checks fc ON fc.incident_id = nai.incident_id + WHERE nai.network_analysis_id = ? + ORDER BY fc.id""", + (analysis_id,), + ) + fc_rows = await cursor.fetchall() + + parts = [] + for r in article_rows: + parts.append(f"a:{r['id']}:{r['collected_at']}") + for r in fc_rows: + parts.append(f"fc:{r['id']}:{r['checked_at']}") + + return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() + + +# --- Endpoints --- + +@router.get("") +async def list_network_analyses( + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Alle Netzwerkanalysen des Tenants auflisten.""" + tenant_id = current_user["tenant_id"] + cursor = await db.execute( + "SELECT * FROM network_analyses WHERE tenant_id = ? ORDER BY created_at DESC", + (tenant_id,), + ) + rows = await cursor.fetchall() + results = [] + for row in rows: + results.append(await _enrich_analysis(db, dict(row))) + return results + + +@router.post("", status_code=201, dependencies=[Depends(require_writable_license)]) +async def create_network_analysis( + body: NetworkAnalysisCreate, + background_tasks: BackgroundTasks, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Neue Netzwerkanalyse erstellen und Generierung starten.""" + from agents.entity_extractor import extract_and_relate_entities + from main import ws_manager + + tenant_id = current_user["tenant_id"] + user_id = current_user["id"] + + if not body.incident_ids: + raise HTTPException(status_code=400, detail="Mindestens eine Lage auswählen") + + # Prüfen ob alle Incidents dem Tenant gehören + placeholders = ",".join("?" for _ in body.incident_ids) + cursor = await db.execute( + f"SELECT id FROM incidents WHERE id IN ({placeholders}) AND tenant_id = ?", + (*body.incident_ids, tenant_id), + ) + found_ids = {r["id"] for r in await cursor.fetchall()} + missing = set(body.incident_ids) - found_ids + if missing: + raise HTTPException( + status_code=400, + detail=f"Lagen nicht gefunden: {', '.join(str(i) for i in missing)}" + ) + + # Analyse anlegen + cursor = await db.execute( + """INSERT INTO network_analyses (name, status, tenant_id, created_by) + VALUES (?, 'generating', ?, ?)""", + (body.name, tenant_id, user_id), + ) + analysis_id = cursor.lastrowid + + for incident_id in body.incident_ids: + await db.execute( + "INSERT INTO network_analysis_incidents (network_analysis_id, incident_id) VALUES (?, ?)", + (analysis_id, incident_id), + ) + await db.commit() + + # Hintergrund-Generierung starten + background_tasks.add_task(extract_and_relate_entities, analysis_id, tenant_id, ws_manager) + + cursor = await db.execute("SELECT * FROM network_analyses WHERE id = ?", (analysis_id,)) + row = await cursor.fetchone() + return await _enrich_analysis(db, dict(row)) + + +@router.get("/{analysis_id}") +async def get_network_analysis( + analysis_id: int, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Einzelne Netzwerkanalyse abrufen.""" + tenant_id = current_user["tenant_id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + return await _enrich_analysis(db, analysis) + + +@router.get("/{analysis_id}/graph") +async def get_network_graph( + analysis_id: int, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Volle Graphdaten (Entities + Relations).""" + tenant_id = current_user["tenant_id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + analysis = await _enrich_analysis(db, analysis) + + cursor = await db.execute( + "SELECT * FROM network_entities WHERE network_analysis_id = ?", (analysis_id,) + ) + entities_raw = await cursor.fetchall() + entities = [] + for e in entities_raw: + ed = dict(e) + # JSON-Felder parsen + try: + ed["aliases"] = json.loads(ed.get("aliases", "[]")) + except (json.JSONDecodeError, TypeError): + ed["aliases"] = [] + try: + ed["metadata"] = json.loads(ed.get("metadata", "{}")) + except (json.JSONDecodeError, TypeError): + ed["metadata"] = {} + ed["corrected_by_opus"] = bool(ed.get("corrected_by_opus", 0)) + entities.append(ed) + + cursor = await db.execute( + "SELECT * FROM network_relations WHERE network_analysis_id = ?", (analysis_id,) + ) + relations_raw = await cursor.fetchall() + relations = [] + for r in relations_raw: + rd = dict(r) + try: + rd["evidence"] = json.loads(rd.get("evidence", "[]")) + except (json.JSONDecodeError, TypeError): + rd["evidence"] = [] + relations.append(rd) + + return {"analysis": analysis, "entities": entities, "relations": relations} + + +@router.post("/{analysis_id}/regenerate", dependencies=[Depends(require_writable_license)]) +async def regenerate_network( + analysis_id: int, + background_tasks: BackgroundTasks, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Analyse neu generieren.""" + from agents.entity_extractor import extract_and_relate_entities + from main import ws_manager + + tenant_id = current_user["tenant_id"] + await _check_analysis_access(db, analysis_id, tenant_id) + + # Bestehende Daten löschen + await db.execute( + "DELETE FROM network_relations WHERE network_analysis_id = ?", (analysis_id,) + ) + await db.execute( + """DELETE FROM network_entity_mentions WHERE entity_id IN + (SELECT id FROM network_entities WHERE network_analysis_id = ?)""", + (analysis_id,), + ) + await db.execute( + "DELETE FROM network_entities WHERE network_analysis_id = ?", (analysis_id,) + ) + await db.execute( + """UPDATE network_analyses + SET status = 'generating', entity_count = 0, relation_count = 0, data_hash = NULL + WHERE id = ?""", + (analysis_id,), + ) + await db.commit() + + background_tasks.add_task(extract_and_relate_entities, analysis_id, tenant_id, ws_manager) + return {"status": "generating", "message": "Neugenerierung gestartet"} + + +@router.get("/{analysis_id}/check-update") +async def check_network_update( + analysis_id: int, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Prüft ob neue Daten verfügbar (Hash-Vergleich).""" + tenant_id = current_user["tenant_id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + + current_hash = await _calculate_data_hash(db, analysis_id) + stored_hash = analysis.get("data_hash") + has_update = stored_hash is not None and current_hash != stored_hash + + return {"has_update": has_update} + + +@router.put("/{analysis_id}", dependencies=[Depends(require_writable_license)]) +async def update_network_analysis( + analysis_id: int, + body: NetworkAnalysisUpdate, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Name oder Lagen aktualisieren.""" + tenant_id = current_user["tenant_id"] + user_id = current_user["id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + + if analysis["created_by"] != user_id: + raise HTTPException(status_code=403, detail="Nur der Ersteller kann bearbeiten") + + if body.name is not None: + await db.execute( + "UPDATE network_analyses SET name = ? WHERE id = ?", + (body.name, analysis_id), + ) + + if body.incident_ids is not None: + if len(body.incident_ids) < 1: + raise HTTPException(status_code=400, detail="Mindestens eine Lage auswählen") + + placeholders = ",".join("?" for _ in body.incident_ids) + cursor = await db.execute( + f"SELECT id FROM incidents WHERE id IN ({placeholders}) AND tenant_id = ?", + (*body.incident_ids, tenant_id), + ) + found_ids = {r["id"] for r in await cursor.fetchall()} + missing = set(body.incident_ids) - found_ids + if missing: + raise HTTPException(status_code=400, detail=f"Lagen nicht gefunden: {', '.join(str(i) for i in missing)}") + + await db.execute( + "DELETE FROM network_analysis_incidents WHERE network_analysis_id = ?", + (analysis_id,), + ) + for iid in body.incident_ids: + await db.execute( + "INSERT INTO network_analysis_incidents (network_analysis_id, incident_id) VALUES (?, ?)", + (analysis_id, iid), + ) + + await db.commit() + + cursor = await db.execute("SELECT * FROM network_analyses WHERE id = ?", (analysis_id,)) + row = await cursor.fetchone() + return await _enrich_analysis(db, dict(row)) + + +@router.delete("/{analysis_id}", dependencies=[Depends(require_writable_license)]) +async def delete_network_analysis( + analysis_id: int, + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Analyse löschen (CASCADE räumt auf).""" + tenant_id = current_user["tenant_id"] + user_id = current_user["id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + + if analysis["created_by"] != user_id: + raise HTTPException(status_code=403, detail="Nur der Ersteller kann löschen") + + await db.execute("DELETE FROM network_analyses WHERE id = ?", (analysis_id,)) + await db.commit() + return {"message": "Netzwerkanalyse gelöscht"} + + +@router.get("/{analysis_id}/export") +async def export_network( + analysis_id: int, + format: str = Query("json", pattern="^(json|csv)$"), + db: aiosqlite.Connection = Depends(db_dependency), + current_user: dict = Depends(get_current_user), +): + """Export als JSON oder CSV.""" + tenant_id = current_user["tenant_id"] + analysis = await _check_analysis_access(db, analysis_id, tenant_id) + analysis = await _enrich_analysis(db, analysis) + + cursor = await db.execute( + "SELECT * FROM network_entities WHERE network_analysis_id = ?", (analysis_id,) + ) + entities = [dict(r) for r in await cursor.fetchall()] + + cursor = await db.execute( + "SELECT * FROM network_relations WHERE network_analysis_id = ?", (analysis_id,) + ) + relations = [dict(r) for r in await cursor.fetchall()] + + entity_map = {e["id"]: e for e in entities} + safe_name = (analysis.get("name", "export") or "export").replace(" ", "_") + + if format == "json": + content = json.dumps( + {"analysis": analysis, "entities": entities, "relations": relations}, + ensure_ascii=False, indent=2, default=str, + ) + return StreamingResponse( + io.BytesIO(content.encode("utf-8")), + media_type="application/json", + headers={"Content-Disposition": f'attachment; filename="netzwerk_{safe_name}.json"'}, + ) + + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(["source", "target", "category", "label", "weight", "description"]) + for rel in relations: + src = entity_map.get(rel.get("source_entity_id"), {}) + tgt = entity_map.get(rel.get("target_entity_id"), {}) + writer.writerow([ + src.get("name", ""), tgt.get("name", ""), + rel.get("category", ""), rel.get("label", ""), + rel.get("weight", 1), rel.get("description", ""), + ]) + + return StreamingResponse( + io.BytesIO(output.getvalue().encode("utf-8")), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="netzwerk_{safe_name}.csv"'}, + ) diff --git a/src/static/css/network.css b/src/static/css/network.css new file mode 100644 index 0000000..f4e014e --- /dev/null +++ b/src/static/css/network.css @@ -0,0 +1,667 @@ +/* === Netzwerkanalyse Styles === */ + +/* --- Sidebar: Netzwerkanalysen-Sektion --- */ +.sidebar-network-item { + display: flex; + align-items: center; + gap: var(--sp-md); + padding: 6px 12px; + cursor: pointer; + border-radius: var(--radius); + transition: background 0.15s; + font-size: 13px; + color: var(--sidebar-text); +} + +.sidebar-network-item:hover { + background: var(--sidebar-hover-bg); +} + +.sidebar-network-item.active { + background: var(--tint-accent); + color: var(--sidebar-active); +} + +.sidebar-network-item .network-icon { + width: 16px; + height: 16px; + flex-shrink: 0; + opacity: 0.7; +} + +.sidebar-network-item .network-item-name { + flex: 1; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.sidebar-network-item .network-item-count { + font-size: 11px; + color: var(--text-tertiary); + flex-shrink: 0; +} + +.sidebar-network-item .network-status-dot { + width: 6px; + height: 6px; + border-radius: 50%; + flex-shrink: 0; +} + +.sidebar-network-item .network-status-dot.generating { + background: var(--warning); + animation: pulse-dot 1.5s infinite; +} + +.sidebar-network-item .network-status-dot.ready { + background: var(--success); +} + +.sidebar-network-item .network-status-dot.error { + background: var(--error); +} + +@keyframes pulse-dot { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.3; } +} + +/* --- Typ-Badge für Netzwerk --- */ +.incident-type-badge.type-network { + background: rgba(99, 102, 241, 0.15); + color: #818CF8; + border: 1px solid rgba(99, 102, 241, 0.3); +} + +/* --- Network View Layout --- */ +#network-view { + display: none; + flex-direction: column; + height: 100%; + overflow: hidden; +} + +/* --- Header Strip --- */ +.network-header-strip { + padding: var(--sp-xl) var(--sp-3xl); + border-bottom: 1px solid var(--border); + background: var(--bg-card); + flex-shrink: 0; +} + +.network-header-row1 { + display: flex; + align-items: center; + justify-content: space-between; + gap: var(--sp-xl); +} + +.network-header-left { + display: flex; + align-items: center; + gap: var(--sp-lg); + min-width: 0; +} + +.network-header-title { + font-family: var(--font-title); + font-size: 18px; + font-weight: 600; + color: var(--text-primary); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} + +.network-header-actions { + display: flex; + align-items: center; + gap: var(--sp-md); + flex-shrink: 0; +} + +.network-header-meta { + display: flex; + align-items: center; + gap: var(--sp-xl); + margin-top: var(--sp-md); + font-size: 12px; + color: var(--text-secondary); +} + +.network-header-meta span { + display: flex; + align-items: center; + gap: var(--sp-xs); +} + +/* --- Update-Badge --- */ +.network-update-badge { + display: inline-flex; + align-items: center; + gap: var(--sp-xs); + padding: 2px 10px; + font-size: 11px; + font-weight: 600; + border-radius: 9999px; + background: rgba(245, 158, 11, 0.15); + color: #F59E0B; + border: 1px solid rgba(245, 158, 11, 0.3); + cursor: pointer; + transition: background 0.15s; +} + +.network-update-badge:hover { + background: rgba(245, 158, 11, 0.25); +} + +/* --- Progress Bar (3 Schritte) --- */ +.network-progress { + padding: var(--sp-lg) var(--sp-3xl); + background: var(--bg-secondary); + border-bottom: 1px solid var(--border); + flex-shrink: 0; +} + +.network-progress-steps { + display: flex; + align-items: center; + gap: var(--sp-md); + margin-bottom: var(--sp-md); +} + +.network-progress-step { + display: flex; + align-items: center; + gap: var(--sp-sm); + font-size: 12px; + color: var(--text-disabled); + transition: color 0.3s; +} + +.network-progress-step.active { + color: var(--accent); + font-weight: 600; +} + +.network-progress-step.done { + color: var(--success); +} + +.network-progress-step-dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: var(--text-disabled); + transition: background 0.3s; +} + +.network-progress-step.active .network-progress-step-dot { + background: var(--accent); + box-shadow: 0 0 6px var(--accent); +} + +.network-progress-step.done .network-progress-step-dot { + background: var(--success); +} + +.network-progress-connector { + flex: 1; + height: 2px; + background: var(--border); + position: relative; +} + +.network-progress-connector.done { + background: var(--success); +} + +.network-progress-track { + height: 3px; + background: var(--border); + border-radius: 2px; + overflow: hidden; +} + +.network-progress-fill { + height: 100%; + background: var(--accent); + border-radius: 2px; + transition: width 0.5s ease; + width: 0%; +} + +.network-progress-label { + font-size: 12px; + color: var(--text-secondary); + margin-top: var(--sp-sm); +} + +/* --- Main Content Area --- */ +.network-content { + display: flex; + flex: 1; + overflow: hidden; +} + +/* --- Graph Area --- */ +.network-graph-area { + flex: 1; + position: relative; + overflow: hidden; + background: var(--bg-primary); +} + +.network-graph-area svg { + width: 100%; + height: 100%; + display: block; +} + +/* --- Rechte Sidebar --- */ +.network-sidebar { + width: 300px; + border-left: 1px solid var(--border); + background: var(--bg-card); + display: flex; + flex-direction: column; + overflow: hidden; + flex-shrink: 0; +} + +.network-sidebar-section { + padding: var(--sp-lg) var(--sp-xl); + border-bottom: 1px solid var(--border); +} + +.network-sidebar-section-title { + font-size: 11px; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.5px; + color: var(--text-secondary); + margin-bottom: var(--sp-md); +} + +/* Suche */ +.network-search-input { + width: 100%; + background: var(--input-bg); + border: 1px solid var(--input-border); + border-radius: var(--radius); + padding: var(--sp-md) var(--sp-lg); + font-size: 13px; + color: var(--text-primary); + font-family: var(--font-body); +} + +.network-search-input:focus { + outline: 2px solid var(--accent); + outline-offset: -2px; + border-color: var(--accent); +} + +.network-search-input::placeholder { + color: var(--text-disabled); +} + +/* Typ-Filter */ +.network-type-filters { + display: flex; + flex-wrap: wrap; + gap: var(--sp-sm); +} + +.network-type-filter { + display: flex; + align-items: center; + gap: var(--sp-xs); + padding: 2px 8px; + font-size: 11px; + border-radius: var(--radius); + cursor: pointer; + border: 1px solid var(--border); + background: transparent; + color: var(--text-secondary); + transition: all 0.15s; +} + +.network-type-filter.active { + border-color: currentColor; + background: currentColor; +} + +.network-type-filter.active span { + color: #fff; +} + +.network-type-filter[data-type="person"] { color: #60A5FA; } +.network-type-filter[data-type="organisation"] { color: #C084FC; } +.network-type-filter[data-type="location"] { color: #34D399; } +.network-type-filter[data-type="event"] { color: #FBBF24; } +.network-type-filter[data-type="military"] { color: #F87171; } + +.network-type-filter .type-dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: currentColor; +} + +/* Gewicht-Slider */ +.network-weight-slider { + width: 100%; + accent-color: var(--accent); +} + +.network-weight-labels { + display: flex; + justify-content: space-between; + font-size: 10px; + color: var(--text-disabled); + margin-top: 2px; +} + +/* Detail-Panel */ +.network-detail-panel { + flex: 1; + overflow-y: auto; + padding: var(--sp-xl); +} + +.network-detail-empty { + padding: var(--sp-3xl) var(--sp-xl); + text-align: center; + font-size: 13px; + color: var(--text-disabled); +} + +.network-detail-name { + font-size: 16px; + font-weight: 600; + color: var(--text-primary); + margin-bottom: var(--sp-sm); +} + +.network-detail-type { + display: inline-block; + font-size: 10px; + font-weight: 600; + padding: 1px 8px; + border-radius: 9999px; + text-transform: uppercase; + letter-spacing: 0.03em; + margin-bottom: var(--sp-lg); +} + +.network-detail-type.type-person { background: rgba(96, 165, 250, 0.15); color: #60A5FA; } +.network-detail-type.type-organisation { background: rgba(192, 132, 252, 0.15); color: #C084FC; } +.network-detail-type.type-location { background: rgba(52, 211, 153, 0.15); color: #34D399; } +.network-detail-type.type-event { background: rgba(251, 191, 36, 0.15); color: #FBBF24; } +.network-detail-type.type-military { background: rgba(248, 113, 113, 0.15); color: #F87171; } + +.network-detail-desc { + font-size: 13px; + color: var(--text-secondary); + line-height: 1.5; + margin-bottom: var(--sp-lg); +} + +.network-detail-section { + margin-top: var(--sp-xl); +} + +.network-detail-section-title { + font-size: 11px; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.5px; + color: var(--text-secondary); + margin-bottom: var(--sp-md); +} + +.network-detail-aliases { + display: flex; + flex-wrap: wrap; + gap: var(--sp-xs); +} + +.network-detail-alias { + font-size: 11px; + padding: 1px 6px; + border-radius: var(--radius); + background: var(--bg-secondary); + color: var(--text-secondary); +} + +.network-detail-stat { + display: flex; + justify-content: space-between; + font-size: 12px; + padding: var(--sp-xs) 0; + color: var(--text-secondary); +} + +.network-detail-stat strong { + color: var(--text-primary); +} + +.network-opus-badge { + display: inline-flex; + align-items: center; + gap: 3px; + font-size: 10px; + padding: 1px 6px; + border-radius: var(--radius); + background: rgba(99, 102, 241, 0.15); + color: #818CF8; + margin-left: var(--sp-sm); +} + +/* Relation-Items im Detail-Panel */ +.network-relation-item { + display: flex; + flex-direction: column; + gap: 2px; + padding: var(--sp-md); + border-radius: var(--radius); + background: var(--bg-secondary); + margin-bottom: var(--sp-sm); + font-size: 12px; + cursor: pointer; + transition: background 0.15s; +} + +.network-relation-item:hover { + background: var(--bg-hover); +} + +.network-relation-header { + display: flex; + align-items: center; + gap: var(--sp-sm); +} + +.network-relation-category { + font-size: 10px; + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.03em; + padding: 0 5px; + border-radius: 3px; +} + +.network-relation-category.cat-alliance { background: rgba(52, 211, 153, 0.15); color: #34D399; } +.network-relation-category.cat-conflict { background: rgba(239, 68, 68, 0.15); color: #EF4444; } +.network-relation-category.cat-diplomacy { background: rgba(251, 191, 36, 0.15); color: #FBBF24; } +.network-relation-category.cat-economic { background: rgba(96, 165, 250, 0.15); color: #60A5FA; } +.network-relation-category.cat-legal { background: rgba(192, 132, 252, 0.15); color: #C084FC; } +.network-relation-category.cat-neutral { background: rgba(107, 114, 128, 0.15); color: #6B7280; } + +.network-relation-target { + color: var(--text-primary); + font-weight: 500; +} + +.network-relation-label { + color: var(--text-secondary); + font-size: 11px; +} + +.network-relation-weight { + font-size: 10px; + color: var(--text-disabled); +} + +/* --- Graph Tooltip --- */ +.network-tooltip { + position: absolute; + pointer-events: none; + background: var(--bg-elevated); + border: 1px solid var(--border); + border-radius: var(--radius); + padding: var(--sp-md) var(--sp-lg); + font-size: 12px; + color: var(--text-primary); + box-shadow: var(--shadow-md); + z-index: 100; + max-width: 300px; + display: none; +} + +.network-tooltip-title { + font-weight: 600; + margin-bottom: 2px; +} + +.network-tooltip-desc { + color: var(--text-secondary); + font-size: 11px; +} + +/* --- Graph SVG Styles --- */ +.network-graph-area .node-label { + font-size: 10px; + fill: var(--text-secondary); + text-anchor: middle; + pointer-events: none; + user-select: none; +} + +.network-graph-area .node-circle { + cursor: pointer; + stroke: var(--bg-primary); + stroke-width: 2; + transition: stroke-width 0.15s; +} + +.network-graph-area .node-circle:hover { + stroke-width: 3; + stroke: var(--text-primary); +} + +.network-graph-area .node-circle.selected { + stroke-width: 3; + stroke: var(--accent); +} + +.network-graph-area .node-circle.dimmed { + opacity: 0.15; +} + +.network-graph-area .node-circle.highlighted { + filter: drop-shadow(0 0 8px currentColor); +} + +.network-graph-area .node-label.dimmed { + opacity: 0.1; +} + +.network-graph-area .edge-line { + fill: none; + pointer-events: stroke; + cursor: pointer; +} + +.network-graph-area .edge-line.dimmed { + opacity: 0.05 !important; +} + +.network-graph-area .edge-line:hover { + stroke-width: 3 !important; +} + +/* --- Modal: Neue Netzwerkanalyse --- */ +.network-incident-list { + max-height: 300px; + overflow-y: auto; + border: 1px solid var(--input-border); + border-radius: var(--radius); + background: var(--input-bg); +} + +.network-incident-search { + width: 100%; + padding: var(--sp-md) var(--sp-lg); + border: none; + border-bottom: 1px solid var(--input-border); + background: var(--input-bg); + color: var(--text-primary); + font-size: 13px; + font-family: var(--font-body); +} + +.network-incident-search:focus { + outline: none; + border-bottom-color: var(--accent); +} + +.network-incident-search::placeholder { + color: var(--text-disabled); +} + +.network-incident-option { + display: flex; + align-items: center; + gap: var(--sp-md); + padding: var(--sp-md) var(--sp-lg); + cursor: pointer; + transition: background 0.1s; + font-size: 13px; + color: var(--text-primary); +} + +.network-incident-option:hover { + background: var(--bg-hover); +} + +.network-incident-option input[type="checkbox"] { + accent-color: var(--accent); +} + +.network-incident-option .incident-option-type { + font-size: 10px; + color: var(--text-disabled); + margin-left: auto; +} + +/* --- Leerer Graph-Zustand --- */ +.network-empty-state { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + height: 100%; + gap: var(--sp-lg); + color: var(--text-disabled); +} + +.network-empty-state-icon { + font-size: 48px; + opacity: 0.3; +} + +.network-empty-state-text { + font-size: 14px; +} diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 03cc80e..fe76eff 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -16,6 +16,7 @@ +
@@ -93,6 +94,16 @@ + + + + + + + + + + + @@ -618,6 +753,9 @@ + + + diff --git a/src/static/js/api_network.js b/src/static/js/api_network.js new file mode 100644 index 0000000..0ddddc6 --- /dev/null +++ b/src/static/js/api_network.js @@ -0,0 +1,43 @@ +/** + * Netzwerkanalyse API-Methoden — werden zum API-Objekt hinzugefügt. + */ + +// Netzwerkanalysen +API.listNetworkAnalyses = function() { + return this._request('GET', '/network-analyses'); +}; + +API.createNetworkAnalysis = function(data) { + return this._request('POST', '/network-analyses', data); +}; + +API.getNetworkAnalysis = function(id) { + return this._request('GET', '/network-analyses/' + id); +}; + +API.getNetworkGraph = function(id) { + return this._request('GET', '/network-analyses/' + id + '/graph'); +}; + +API.regenerateNetwork = function(id) { + return this._request('POST', '/network-analyses/' + id + '/regenerate'); +}; + +API.checkNetworkUpdate = function(id) { + return this._request('GET', '/network-analyses/' + id + '/check-update'); +}; + +API.updateNetworkAnalysis = function(id, data) { + return this._request('PUT', '/network-analyses/' + id, data); +}; + +API.deleteNetworkAnalysis = function(id) { + return this._request('DELETE', '/network-analyses/' + id); +}; + +API.exportNetworkAnalysis = function(id, format) { + var token = localStorage.getItem('osint_token'); + return fetch(this.baseUrl + '/network-analyses/' + id + '/export?format=' + format, { + headers: { 'Authorization': 'Bearer ' + token }, + }); +}; diff --git a/src/static/js/app.js b/src/static/js/app.js index 7cee6a9..24a9090 100644 --- a/src/static/js/app.js +++ b/src/static/js/app.js @@ -518,10 +518,15 @@ const App = { // Sidebar-Chevrons initial auf offen setzen (Archiv geschlossen) document.querySelectorAll('.sidebar-chevron').forEach(c => c.classList.add('open')); document.getElementById('chevron-archived-incidents').classList.remove('open'); + var chevronNetwork = document.getElementById('chevron-network-analyses-list'); + if (chevronNetwork) chevronNetwork.classList.add('open'); // Lagen laden (frueh, damit Sidebar sofort sichtbar) await this.loadIncidents(); + // Netzwerkanalysen laden + await this.loadNetworkAnalyses(); + // Notification-Center initialisieren try { await NotificationCenter.init(); } catch (e) { console.warn('NotificationCenter:', e); } @@ -532,6 +537,9 @@ const App = { WS.on('refresh_summary', (msg) => this.handleRefreshSummary(msg)); WS.on('refresh_error', (msg) => this.handleRefreshError(msg)); WS.on('refresh_cancelled', (msg) => this.handleRefreshCancelled(msg)); + WS.on('network_status', (msg) => this._handleNetworkStatus(msg)); + WS.on('network_complete', (msg) => this._handleNetworkComplete(msg)); + WS.on('network_error', (msg) => this._handleNetworkError(msg)); // Laufende Refreshes wiederherstellen try { @@ -552,6 +560,17 @@ const App = { } } + // Zuletzt ausgewählte Netzwerkanalyse wiederherstellen + if (!savedId || !this.incidents.some(inc => inc.id === parseInt(savedId, 10))) { + const savedNetworkId = localStorage.getItem('selectedNetworkId'); + if (savedNetworkId) { + const nid = parseInt(savedNetworkId, 10); + if (this.networkAnalyses.some(na => na.id === nid)) { + await this.selectNetworkAnalysis(nid); + } + } + } + // Leaflet-Karte nachladen falls CDN langsam war setTimeout(() => UI.retryPendingMap(), 2000); }, @@ -647,6 +666,10 @@ const App = { document.getElementById('empty-state').style.display = 'none'; document.getElementById('incident-view').style.display = 'flex'; + document.getElementById('network-view').style.display = 'none'; + this.currentNetworkId = null; + localStorage.removeItem('selectedNetworkId'); + this.renderNetworkSidebar(); // GridStack-Animation deaktivieren und Scroll komplett sperren // bis alle Tile-Resize-Operationen (doppeltes rAF) abgeschlossen sind diff --git a/src/static/js/app_network.js b/src/static/js/app_network.js new file mode 100644 index 0000000..f97c3a0 --- /dev/null +++ b/src/static/js/app_network.js @@ -0,0 +1,447 @@ +/** + * Netzwerkanalyse-Erweiterungen für App-Objekt. + * Wird nach app.js geladen und erweitert App um Netzwerk-Funktionalität. + */ + +// State-Erweiterung +App.networkAnalyses = []; +App.currentNetworkId = null; +App._networkGenerating = new Set(); + +/** + * Netzwerkanalysen laden und Sidebar rendern. + */ +App.loadNetworkAnalyses = async function() { + try { + this.networkAnalyses = await API.listNetworkAnalyses(); + } catch (e) { + console.warn('Netzwerkanalysen laden fehlgeschlagen:', e); + this.networkAnalyses = []; + } + this.renderNetworkSidebar(); +}; + +/** + * Netzwerkanalysen-Sektion in der Sidebar rendern. + */ +App.renderNetworkSidebar = function() { + var container = document.getElementById('network-analyses-list'); + if (!container) return; + + var countEl = document.getElementById('count-network-analyses'); + if (countEl) countEl.textContent = '(' + this.networkAnalyses.length + ')'; + + if (this.networkAnalyses.length === 0) { + container.innerHTML = '' + + this._esc(entity.description) + '
'; + } + + // Aliases + if (entity.aliases && entity.aliases.length > 0) { + html += 'Klicke auf einen Knoten, um Details anzuzeigen.
'; + } + }, + + // ---- tooltip helpers ------------------------------------------------------ + + _showTooltip(event, html) { + if (!this._tooltip) return; + this._tooltip + .style('display', 'block') + .html(html); + this._moveTooltip(event); + }, + + _moveTooltip(event) { + if (!this._tooltip) return; + this._tooltip + .style('left', (event.offsetX + 14) + 'px') + .style('top', (event.offsetY - 10) + 'px'); + }, + + _hideTooltip() { + if (!this._tooltip) return; + this._tooltip.style('display', 'none'); + }, + + // ---- string helpers ------------------------------------------------------- + + _esc(str) { + if (!str) return ''; + const div = document.createElement('div'); + div.appendChild(document.createTextNode(str)); + return div.innerHTML; + }, + + _csvField(val) { + const s = String(val == null ? '' : val); + if (s.includes(',') || s.includes('"') || s.includes('\n')) { + return '"' + s.replace(/"/g, '""') + '"'; + } + return s; + }, +};