Dateien
AegisSight-Monitor/src/routers/incidents.py
UserIsMH b1f8113207 Bericht-Export: drei Verbesserungen
1. Faktencheck immer vollständig
   PDF-Export hatte im scope=report einen [:20]-Cap, der vollständige
   Faktencheck wurde nur bei scope=full gerendert. Jetzt ungekürzt
   überall, sortiert chronologisch absteigend (DB-Sortierung).

2. Status-Labels aus Frontend übernommen
   FC_STATUS_LABELS hatte nur 4 Werte; in der DB existieren aber 7+
   (confirmed/unconfirmed/contradicted/developing/established/
   unverified/disputed). Folge: "contradicted" und drei weitere
   wurden auf englisch ausgegeben. Jetzt 1:1 vom Monitor-UI:
     contradicted → "Widerlegt"
     developing   → "Unklar"
     established  → "Gesichert"
     unverified   → "Ungeprüft"

3. Adhoc-Export: Neueste Entwicklungen statt Executive Summary
   Bei Live-Monitoring-Lagen ist die generische Executive Summary
   weniger aussagekräftig als die kompakten "Neueste Entwicklungen"-
   Bullets. Endpoint nutzt jetzt:
     - adhoc + latest_developments vorhanden → latest_developments
       (Markdown -> HTML konvertiert)
     - adhoc + leer → cached/generierte Executive Summary (Fallback)
     - research → unverändert Executive Summary

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 23:32:36 +02:00

1232 Zeilen
48 KiB
Python

"""Incidents-Router: Lagen verwalten (Multi-Tenant)."""
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from fastapi.responses import StreamingResponse
from models import IncidentCreate, IncidentUpdate, IncidentResponse, IncidentListItem, SubscriptionUpdate, SubscriptionResponse, DescriptionEnhanceRequest
from auth import get_current_user
from middleware.license_check import require_writable_license
from database import db_dependency, get_db
from datetime import datetime
from config import TIMEZONE
import asyncio
import aiosqlite
import io
import json
import logging
import re
import unicodedata
_geoparse_logger = logging.getLogger("osint.geoparse_bg")
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
INCIDENT_UPDATE_COLUMNS = {
"title", "description", "type", "status", "refresh_mode",
"refresh_interval", "refresh_start_time", "retention_days", "international_sources", "include_telegram", "visibility",
}
async def _check_incident_access(
db: aiosqlite.Connection, incident_id: int, user_id: int, tenant_id: int
) -> aiosqlite.Row:
"""Lage laden und Zugriff pruefen (Tenant + Sichtbarkeit)."""
cursor = await db.execute(
"SELECT * FROM incidents WHERE id = ? AND tenant_id = ?",
(incident_id, tenant_id),
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
if row["visibility"] == "private" and row["created_by"] != user_id:
raise HTTPException(status_code=403, detail="Kein Zugriff auf private Lage")
return row
async def _enrich_incident(db: aiosqlite.Connection, row: aiosqlite.Row) -> dict:
"""Incident-Row mit Statistiken und Ersteller-Name anreichern."""
incident = dict(row)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident["id"],),
)
article_count = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id = ?",
(incident["id"],),
)
source_count = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT email FROM users WHERE id = ?",
(incident["created_by"],),
)
user_row = await cursor.fetchone()
incident["article_count"] = article_count
incident["source_count"] = source_count
incident["created_by_username"] = user_row["email"] if user_row else "Unbekannt"
return incident
@router.get("", response_model=list[IncidentListItem])
async def list_incidents(
status_filter: str = None,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Lagen des Tenants auflisten (oeffentliche + eigene private).
Liefert schlanke Sidebar-Items — ohne summary, description, sources_json.
Volltexte kommen erst beim Oeffnen der Lage per GET /incidents/{id}.
"""
tenant_id = current_user.get("tenant_id")
user_id = current_user["id"]
# Nur die fuer Sidebar + Edit-Dialog noetigen Spalten selektieren
# (spart bei Iran: 324 KB sources_json + 32 KB summary).
# has_summary als Bit — Frontend nutzt es zur Erkennung "erster Refresh".
query = (
"SELECT id, title, description, type, status, refresh_mode, refresh_interval, "
"refresh_start_time, retention_days, visibility, "
"international_sources, include_telegram, created_by, created_at, updated_at, "
"CASE WHEN summary IS NOT NULL AND summary != '' THEN 1 ELSE 0 END AS has_summary "
"FROM incidents WHERE tenant_id = ? AND (visibility = 'public' OR created_by = ?)"
)
params = [tenant_id, user_id]
if status_filter:
query += " AND status = ?"
params.append(status_filter)
query += " ORDER BY updated_at DESC"
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
results = []
for row in rows:
results.append(await _enrich_incident(db, row))
return results
@router.post("", response_model=IncidentResponse, status_code=status.HTTP_201_CREATED)
async def create_incident(
data: IncidentCreate,
current_user: dict = Depends(require_writable_license),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Neue Lage anlegen."""
tenant_id = current_user.get("tenant_id")
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
cursor = await db.execute(
"""INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval,
refresh_start_time, retention_days, international_sources, include_telegram, visibility,
tenant_id, created_by, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
data.title,
data.description,
data.type,
data.refresh_mode,
data.refresh_interval,
data.refresh_start_time,
data.retention_days,
1 if data.international_sources else 0,
1 if data.include_telegram else 0,
data.visibility,
tenant_id,
current_user["id"],
now,
now,
),
)
await db.commit()
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (cursor.lastrowid,))
row = await cursor.fetchone()
return await _enrich_incident(db, row)
@router.get("/refreshing")
async def get_refreshing_incidents(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Gibt IDs aller Lagen mit laufendem Refresh zurueck (nur eigener Tenant)."""
tenant_id = current_user.get("tenant_id")
cursor = await db.execute(
"""SELECT rl.incident_id, rl.started_at FROM refresh_log rl
JOIN incidents i ON i.id = rl.incident_id
WHERE rl.status = 'running'
AND i.tenant_id = ?
AND (i.visibility = 'public' OR i.created_by = ?)""",
(tenant_id, current_user["id"]),
)
rows = await cursor.fetchall()
# Also include queued incidents from orchestrator
from agents.orchestrator import orchestrator
queued_ids = list(orchestrator._queued_ids) if hasattr(orchestrator, '_queued_ids') else []
current_task = orchestrator._current_task if hasattr(orchestrator, '_current_task') else None
# Session-Start des aktuell laufenden Tasks — stabil ueber Multi-Pass/Retry hinweg.
# Verhindert, dass der Frontend-Timer beim Reload auf den letzten Log-Eintrag
# (pass 2/3 oder retry n) zurueckspringt.
current_started_at = (
orchestrator._current_task_started_at
if hasattr(orchestrator, '_current_task_started_at') else None
)
details = {}
for row in rows:
iid = row["incident_id"]
started_at = (
current_started_at
if (iid == current_task and current_started_at)
else row["started_at"]
)
details[str(iid)] = {"started_at": started_at}
return {
"refreshing": [row["incident_id"] for row in rows],
"queued": queued_ids,
"current": current_task,
"details": details,
}
# --- Beschreibung generieren (Prompt Enhancement) ---
ENHANCE_PROMPT_RESEARCH = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
Deine Aufgabe: Strukturiere ein Recherche-Briefing, das Analysten als Leitfaden für ihre Suche verwenden.
Du behauptest KEINE Fakten und musst das Thema NICHT kennen oder verifizieren.
Der Nutzer gibt das Thema vor -- du definierst Suchrichtungen, Schwerpunkte und Stichworte.
Erstelle das Briefing IMMER, auch wenn dir das Thema unbekannt ist.
WICHTIG: Verwende IMMER echte Umlaute (ä, ö, ü, ß) und KEINE Umschreibungen.
Titel: {title}
Vorhandener Kontext: {context}
Typ: Hintergrundrecherche
Erstelle ein präzises Recherche-Briefing mit:
1. Fallbezeichnung (vollständige Benennung des Themas basierend auf Titel und Kontext)
2. Recherche-Schwerpunkte (5-8 thematische Punkte, z.B. Sachverhalt, beteiligte Parteien, rechtliche Aspekte, mediale Rezeption, Hintergründe, Chronologie)
3. Relevante Suchbegriffe (deutsch + englisch, inkl. Abkürzungen und alternative Schreibweisen)
Schreibe NUR das Briefing als Fließtext mit Aufzählungen. Keine Erklärungen, Rückfragen oder Disclaimer."""
ENHANCE_PROMPT_ADHOC = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
Deine Aufgabe: Erstelle eine knappe Vorfallsbeschreibung, die als Suchauftrag für Live-Monitoring dient.
Du behauptest KEINE Fakten und musst den Vorfall NICHT kennen oder verifizieren.
Der Nutzer gibt das Thema vor -- du strukturierst, wonach gesucht werden soll.
Erstelle die Beschreibung IMMER, auch wenn dir der Vorfall unbekannt ist.
WICHTIG: Verwende IMMER echte Umlaute (ä, ö, ü, ß) und KEINE Umschreibungen.
Titel: {title}
Vorhandener Kontext: {context}
Typ: Live-Monitoring (aktuelle Ereignisse)
Erstelle eine knappe, informative Beschreibung mit:
1. Was ist passiert / worum geht es (basierend auf Titel und Kontext)
2. Wo (geographischer Kontext, falls ableitbar)
3. Wer ist beteiligt (Akteure, Organisationen, Länder)
4. Wonach soll gesucht werden (aktuelle Entwicklungen, Reaktionen, Hintergründe)
Schreibe NUR die Beschreibung als Fließtext (3-5 Zeilen). Keine Erklärungen, Rückfragen oder Disclaimer."""
_enhance_logger = logging.getLogger("osint.enhance")
@router.post("/enhance-description")
async def enhance_description(
data: DescriptionEnhanceRequest,
current_user: dict = Depends(require_writable_license),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Generiert eine strukturierte Beschreibung per KI aus dem Titel."""
from agents.claude_client import call_claude, ClaudeCliError
from config import CLAUDE_MODEL_FAST
from services.license_service import charge_usage_to_tenant
template = ENHANCE_PROMPT_RESEARCH if data.type == "research" else ENHANCE_PROMPT_ADHOC
context = data.description.strip() if data.description and data.description.strip() else "Kein Kontext angegeben"
prompt = template.format(title=data.title.strip(), context=context)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST, raw_text=True, timeout=60)
except ClaudeCliError as e:
_enhance_logger.error(f"Beschreibung generieren: ClaudeCliError [{e.error_type}]: {e.message}")
if e.error_type == "auth_error":
raise HTTPException(status_code=503, detail="KI-Zugang aktuell nicht verfuegbar. Bitte Administrator kontaktieren.")
if e.error_type == "rate_limit":
raise HTTPException(status_code=429, detail="KI ist gerade ausgelastet. Bitte in einer Minute erneut versuchen.")
raise HTTPException(status_code=500, detail="Beschreibung konnte nicht generiert werden")
except TimeoutError:
_enhance_logger.error("Beschreibung generieren: Timeout")
raise HTTPException(status_code=504, detail="Die KI antwortet gerade nicht. Bitte erneut versuchen.")
except HTTPException:
raise
except Exception as e:
_enhance_logger.error(f"Beschreibung generieren fehlgeschlagen: {e}")
raise HTTPException(status_code=500, detail="Beschreibung konnte nicht generiert werden")
_enhance_logger.info(
f"Beschreibung generiert fuer \"{data.title[:50]}\": "
f"{usage.input_tokens}in/{usage.output_tokens}out"
)
await charge_usage_to_tenant(db, current_user.get("tenant_id"), usage, source="enhance")
await db.commit()
return {"description": result.strip()}
@router.get("/{incident_id}", response_model=IncidentResponse)
async def get_incident(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Einzelne Lage abrufen.
sources_json wird NICHT mitgeliefert — fuer Zitate-Lookups
stattdessen GET /incidents/{id}/sources verwenden (lazy).
"""
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
return await _enrich_incident(db, row)
@router.get("/{incident_id}/sources")
async def get_incident_sources(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Sources-Array einer Lage (geparst aus sources_json) fuer Zitate-Lookups."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"SELECT sources_json FROM incidents WHERE id = ?",
(incident_id,),
)
row = await cursor.fetchone()
sources: list = []
if row and row["sources_json"]:
try:
parsed = json.loads(row["sources_json"])
if isinstance(parsed, list):
sources = parsed
except (json.JSONDecodeError, TypeError):
sources = []
return {"incident_id": incident_id, "sources": sources}
@router.put("/{incident_id}", response_model=IncidentResponse)
async def update_incident(
incident_id: int,
data: IncidentUpdate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage aktualisieren."""
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
if field not in INCIDENT_UPDATE_COLUMNS:
continue
if field in ("international_sources", "include_telegram"):
updates[field] = 1 if value else 0
else:
updates[field] = value
if not updates:
return await _enrich_incident(db, row)
updates["updated_at"] = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [incident_id]
await db.execute(f"UPDATE incidents SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
row = await cursor.fetchone()
return await _enrich_incident(db, row)
@router.delete("/{incident_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_incident(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage loeschen (nur Ersteller)."""
tenant_id = current_user.get("tenant_id")
cursor = await db.execute(
"SELECT id, created_by FROM incidents WHERE id = ? AND tenant_id = ?",
(incident_id, tenant_id),
)
incident = await cursor.fetchone()
if not incident:
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
if incident["created_by"] != current_user["id"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nur der Ersteller kann diese Lage loeschen",
)
try:
await db.execute("DELETE FROM incidents WHERE id = ?", (incident_id,))
await db.commit()
except Exception as e:
if "database is locked" in str(e):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Datenbank ist momentan beschaeftigt. Bitte in wenigen Sekunden erneut versuchen.",
)
raise
@router.get("/{incident_id}/articles")
async def get_articles(
incident_id: int,
limit: int = Query(500, ge=1, le=1000),
offset: int = Query(0, ge=0),
search: str | None = Query(None, min_length=0, max_length=200),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Artikel einer Lage paginiert abrufen.
Response: ``{"total": int, "articles": [...]}``.
Optionaler ``search``-Param filtert per LIKE ueber
headline, headline_de, source, content_de, content_original.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
search_clean = (search or "").strip()
if search_clean:
like = f"%{search_clean}%"
params = (incident_id, like, like, like, like, like)
where = (
"WHERE incident_id = ? AND ("
"COALESCE(headline,'') LIKE ? OR "
"COALESCE(headline_de,'') LIKE ? OR "
"COALESCE(source,'') LIKE ? OR "
"COALESCE(content_de,'') LIKE ? OR "
"COALESCE(content_original,'') LIKE ?)"
)
else:
params = (incident_id,)
where = "WHERE incident_id = ?"
cursor = await db.execute(f"SELECT COUNT(*) AS cnt FROM articles {where}", params)
total = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
f"SELECT * FROM articles {where} ORDER BY collected_at DESC LIMIT ? OFFSET ?",
(*params, limit, offset),
)
rows = await cursor.fetchall()
return {"total": total, "articles": [dict(row) for row in rows]}
@router.get("/{incident_id}/articles/sources-summary")
async def get_articles_sources_summary(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Quellen-Statistik fuer eine Lage (fuer Quellenuebersicht)."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT source,
COUNT(*) AS article_count,
GROUP_CONCAT(DISTINCT COALESCE(language,'de')) AS languages
FROM articles WHERE incident_id = ?
GROUP BY source ORDER BY article_count DESC""",
(incident_id,),
)
sources = []
for r in await cursor.fetchall():
d = dict(r)
langs = (d.pop("languages") or "de").split(",")
d["languages"] = sorted({(l or "de").strip() for l in langs if l is not None})
sources.append(d)
# Sprach-Verteilung gesamt
cursor = await db.execute(
"""SELECT COALESCE(language,'de') AS language, COUNT(*) AS cnt
FROM articles WHERE incident_id = ?
GROUP BY language ORDER BY cnt DESC""",
(incident_id,),
)
lang_counts = [dict(r) for r in await cursor.fetchall()]
total_cursor = await db.execute(
"SELECT COUNT(*) AS cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
total = (await total_cursor.fetchone())["cnt"]
return {"total": total, "sources": sources, "language_counts": lang_counts}
@router.get("/{incident_id}/articles/timeline-buckets")
async def get_articles_timeline_buckets(
incident_id: int,
granularity: str = Query("day", pattern="^(hour|day|week|month)$"),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Zeit-Buckets fuer die Timeline-Achse.
Zaehlt Artikel und Snapshots pro Bucket. Kein Inhalt, nur Counts.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
fmt_map = {
"hour": "%Y-%m-%d %H:00",
"day": "%Y-%m-%d",
"week": "%Y-%W",
"month": "%Y-%m",
}
fmt = fmt_map[granularity]
cursor = await db.execute(
f"""SELECT strftime(?, collected_at) AS bucket, COUNT(*) AS article_count
FROM articles WHERE incident_id = ?
GROUP BY bucket ORDER BY bucket""",
(fmt, incident_id),
)
article_rows = {r["bucket"]: r["article_count"] for r in await cursor.fetchall()}
cursor = await db.execute(
f"""SELECT strftime(?, created_at) AS bucket, COUNT(*) AS snapshot_count
FROM incident_snapshots WHERE incident_id = ?
GROUP BY bucket ORDER BY bucket""",
(fmt, incident_id),
)
snapshot_rows = {r["bucket"]: r["snapshot_count"] for r in await cursor.fetchall()}
all_buckets = sorted(set(article_rows.keys()) | set(snapshot_rows.keys()))
return {
"granularity": granularity,
"buckets": [
{
"bucket": b,
"article_count": article_rows.get(b, 0),
"snapshot_count": snapshot_rows.get(b, 0),
}
for b in all_buckets
],
}
@router.get("/{incident_id}/snapshots")
async def get_snapshots(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lageberichte (Snapshots) einer Lage abrufen — schlanke Liste.
Liefert nur Metadaten und einen 300-Zeichen-Preview des Summary.
Der Volltext (summary + sources_json) wird per Einzel-Endpunkt
``GET /{incident_id}/snapshots/{snapshot_id}`` bei Bedarf geladen.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
SUBSTR(summary, 1, 300) AS summary_preview
FROM incident_snapshots WHERE incident_id = ?
ORDER BY created_at DESC""",
(incident_id,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/snapshots/search")
async def search_snapshots(
incident_id: int,
q: str = Query(..., min_length=2, max_length=200),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Volltextsuche über alle Snapshots einer Lage.
Liefert dieselbe schlanke Shape wie der Listen-Endpunkt,
gefiltert per ``summary LIKE '%q%'``.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
like = f"%{q}%"
cursor = await db.execute(
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
SUBSTR(summary, 1, 300) AS summary_preview
FROM incident_snapshots
WHERE incident_id = ? AND summary LIKE ?
ORDER BY created_at DESC""",
(incident_id, like),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/snapshots/{snapshot_id}")
async def get_snapshot(
incident_id: int,
snapshot_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Einzelnen Snapshot mit vollem Summary + sources_json abrufen (Lazy-Load)."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, incident_id, summary, sources_json,
article_count, fact_check_count, created_at
FROM incident_snapshots WHERE id = ? AND incident_id = ?""",
(snapshot_id, incident_id),
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Snapshot nicht gefunden")
return dict(row)
@router.get("/{incident_id}/factchecks")
async def get_factchecks(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Faktenchecks einer Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
(incident_id,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/pipeline")
async def get_pipeline(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Analysepipeline-Status der Lage: Definition aller Schritte + Stand des
letzten (oder gerade laufenden) Refreshs.
Antwort:
{
"is_research": bool,
"is_running": bool,
"last_refresh": {started_at, completed_at, duration_sec, status, pass_total} | null,
"steps_definition": [{key, label, icon, tooltip}, ...],
"steps": [{step_key, status, count_value, count_secondary, pass_number}, ...]
}
"""
from services.pipeline_tracker import PIPELINE_STEPS
tenant_id = current_user.get("tenant_id")
incident_row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
is_research = (incident_row["type"] or "adhoc") == "research"
# Jüngsten Refresh-Log wählen: bevorzugt running, sonst der letzte completed
cursor = await db.execute(
"""SELECT id, started_at, completed_at, status, retry_count
FROM refresh_log
WHERE incident_id = ? AND status = 'running'
ORDER BY started_at DESC LIMIT 1""",
(incident_id,),
)
row = await cursor.fetchone()
if not row:
cursor = await db.execute(
"""SELECT id, started_at, completed_at, status, retry_count
FROM refresh_log
WHERE incident_id = ?
ORDER BY started_at DESC LIMIT 1""",
(incident_id,),
)
row = await cursor.fetchone()
last_refresh = None
steps = []
is_running = False
if row:
is_running = row["status"] == "running"
# Pipeline-Steps zu diesem Refresh laden
sc = await db.execute(
"""SELECT step_key, pass_number, status, count_value, count_secondary,
started_at, completed_at
FROM refresh_pipeline_steps
WHERE refresh_log_id = ?
ORDER BY pass_number ASC, id ASC""",
(row["id"],),
)
steps = [dict(r) for r in await sc.fetchall()]
# Pass-Total: bei Research-Lagen mit Multi-Pass-Daten ermitteln
max_pass = 1
for s in steps:
if s["pass_number"] and s["pass_number"] > max_pass:
max_pass = s["pass_number"]
# Dauer berechnen (nur wenn completed)
duration_sec = None
try:
if row["started_at"] and row["completed_at"]:
t0 = datetime.strptime(row["started_at"], "%Y-%m-%d %H:%M:%S")
t1 = datetime.strptime(row["completed_at"], "%Y-%m-%d %H:%M:%S")
duration_sec = max(0, int((t1 - t0).total_seconds()))
except Exception:
duration_sec = None
last_refresh = {
"started_at": row["started_at"],
"completed_at": row["completed_at"],
"status": row["status"],
"duration_sec": duration_sec,
"pass_total": max_pass,
}
return {
"is_research": is_research,
"is_running": is_running,
"last_refresh": last_refresh,
"steps_definition": PIPELINE_STEPS,
"steps": steps,
}
@router.get("/{incident_id}/locations")
async def get_locations(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Geografische Orte einer Lage abrufen (serverseitig aggregiert nach Ort).
Drei getrennte Queries (alle klein) statt eines 21k-Zeilen-JOINs:
1. Orte-Aggregate per GROUP BY (name, lat, lon) — liefert direkt ~Ergebnismenge.
2. Kategorien pro Ort per GROUP BY (name, lat, lon, category) — fuer dominante Kategorie.
3. Sample-Artikel pro Ort via ROW_NUMBER() — max. 10 pro Ort.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
# 1. Orte-Aggregate
cursor = await db.execute(
"""SELECT
COALESCE(location_name_normalized, location_name) AS name,
ROUND(latitude, 2) AS lat,
ROUND(longitude, 2) AS lon,
MIN(country_code) AS country_code,
MAX(confidence) AS confidence,
COUNT(*) AS article_count
FROM article_locations
WHERE incident_id = ?
GROUP BY name, lat, lon
ORDER BY article_count DESC""",
(incident_id,),
)
loc_rows = [dict(r) for r in await cursor.fetchall()]
# 2. Kategorien pro Ort
cursor = await db.execute(
"""SELECT
COALESCE(location_name_normalized, location_name) AS name,
ROUND(latitude, 2) AS lat,
ROUND(longitude, 2) AS lon,
COALESCE(category, 'mentioned') AS category,
COUNT(*) AS cnt
FROM article_locations
WHERE incident_id = ?
GROUP BY name, lat, lon, category""",
(incident_id,),
)
cat_map: dict[tuple, dict[str, int]] = {}
for r in await cursor.fetchall():
key = (r["name"], r["lat"], r["lon"])
cat_map.setdefault(key, {})[r["category"]] = r["cnt"]
# 3. Sample-Artikel pro Ort (max. 10, neueste zuerst)
cursor = await db.execute(
"""SELECT name, lat, lon, article_id, headline, headline_de, source, source_url
FROM (
SELECT
COALESCE(al.location_name_normalized, al.location_name) AS name,
ROUND(al.latitude, 2) AS lat,
ROUND(al.longitude, 2) AS lon,
a.id AS article_id,
a.headline, a.headline_de, a.source, a.source_url,
ROW_NUMBER() OVER (
PARTITION BY COALESCE(al.location_name_normalized, al.location_name),
ROUND(al.latitude, 2), ROUND(al.longitude, 2)
ORDER BY a.collected_at DESC
) AS rn
FROM article_locations al
JOIN articles a ON a.id = al.article_id
WHERE al.incident_id = ?
)
WHERE rn <= 10""",
(incident_id,),
)
sample_map: dict[tuple, list[dict]] = {}
for r in await cursor.fetchall():
key = (r["name"], r["lat"], r["lon"])
sample_map.setdefault(key, []).append({
"id": r["article_id"],
"headline": r["headline_de"] or r["headline"],
"source": r["source"],
"source_url": r["source_url"],
})
# Zusammensetzen
priority = {"primary": 4, "secondary": 3, "tertiary": 2, "mentioned": 1}
result = []
for loc in loc_rows:
key = (loc["name"], loc["lat"], loc["lon"])
cats = cat_map.get(key, {})
best_cat = max(cats, key=lambda c: (priority.get(c, 0), cats[c])) if cats else "mentioned"
result.append({
"location_name": loc["name"],
"lat": loc["lat"],
"lon": loc["lon"],
"country_code": loc["country_code"],
"confidence": loc["confidence"],
"article_count": loc["article_count"],
"articles": sample_map.get(key, []),
"category": best_cat,
})
# Category-Labels aus Incident laden
cursor = await db.execute(
"SELECT category_labels FROM incidents WHERE id = ?", (incident_id,)
)
inc_row = await cursor.fetchone()
category_labels = None
if inc_row and inc_row["category_labels"]:
try:
category_labels = json.loads(inc_row["category_labels"])
except (json.JSONDecodeError, TypeError):
pass
return {"category_labels": category_labels, "locations": result}
# Geoparse-Status pro Incident (in-memory)
_geoparse_status: dict[int, dict] = {}
async def _run_geoparse_background(incident_id: int, tenant_id: int | None):
"""Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage."""
_geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0}
db = None
try:
from agents.geoparsing import geoparse_articles
db = await get_db()
# Incident-Kontext fuer Haiku laden
cursor = await db.execute(
"SELECT title, description FROM incidents WHERE id = ?", (incident_id,)
)
inc_row = await cursor.fetchone()
incident_context = ""
if inc_row:
incident_context = f"{inc_row['title']} - {inc_row['description'] or ''}"
cursor = await db.execute(
"""SELECT a.* FROM articles a
WHERE a.incident_id = ?
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
(incident_id, incident_id),
)
articles = [dict(row) for row in await cursor.fetchall()]
if not articles:
_geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0}
return
total = len(articles)
_geoparse_status[incident_id]["total"] = total
_geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}")
# In Batches verarbeiten (50 Artikel pro Batch)
batch_size = 50
geo_count = 0
processed = 0
for i in range(0, total, batch_size):
batch = articles[i:i + batch_size]
geo_result = await geoparse_articles(batch, incident_context)
# Tuple-Rückgabe: (locations_dict, category_labels)
if isinstance(geo_result, tuple):
batch_geo_results, batch_labels = geo_result
# Labels beim ersten Batch speichern
if batch_labels and i == 0:
try:
await db.execute(
"UPDATE incidents SET category_labels = ? WHERE id = ? AND category_labels IS NULL",
(json.dumps(batch_labels, ensure_ascii=False), incident_id),
)
await db.commit()
except Exception:
pass
else:
batch_geo_results = geo_result
for art_id, locations in batch_geo_results.items():
for loc in locations:
await db.execute(
"""INSERT INTO article_locations
(article_id, incident_id, location_name, location_name_normalized,
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
)
geo_count += 1
await db.commit()
processed += len(batch)
_geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count}
_geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count}
_geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})")
except Exception as e:
_geoparse_status[incident_id] = {"status": "error", "error": str(e)}
_geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}")
finally:
if db:
await db.close()
@router.post("/{incident_id}/geoparse")
async def trigger_geoparse(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
# Bereits laufend?
existing = _geoparse_status.get(incident_id, {})
if existing.get("status") == "running":
return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"}
# Pruefen ob es ueberhaupt ungeparsete Artikel gibt
cursor = await db.execute(
"""SELECT COUNT(*) as cnt FROM articles a
WHERE a.incident_id = ?
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
(incident_id, incident_id),
)
count = (await cursor.fetchone())["cnt"]
if count == 0:
return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0}
# Hintergrund-Task starten
asyncio.create_task(_run_geoparse_background(incident_id, tenant_id))
return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"}
@router.get("/{incident_id}/geoparse-status")
async def get_geoparse_status(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Status des laufenden Geoparsing-Tasks abfragen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
return _geoparse_status.get(incident_id, {"status": "idle"})
@router.get("/{incident_id}/refresh-log")
async def get_refresh_log(
incident_id: int,
limit: int = 20,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Refresh-Verlauf einer Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, started_at, completed_at, articles_found, status,
trigger_type, retry_count, error_message
FROM refresh_log WHERE incident_id = ?
ORDER BY started_at DESC LIMIT ?""",
(incident_id, min(limit, 100)),
)
rows = await cursor.fetchall()
results = []
for row in rows:
entry = dict(row)
if entry["started_at"] and entry["completed_at"]:
try:
start = datetime.fromisoformat(entry["started_at"])
end = datetime.fromisoformat(entry["completed_at"])
entry["duration_seconds"] = round((end - start).total_seconds(), 1)
except Exception:
entry["duration_seconds"] = None
else:
entry["duration_seconds"] = None
results.append(entry)
return results
@router.get("/{incident_id}/subscription", response_model=SubscriptionResponse)
async def get_subscription(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT notify_email_summary, notify_email_new_articles, notify_email_status_change
FROM incident_subscriptions WHERE user_id = ? AND incident_id = ?""",
(current_user["id"], incident_id),
)
row = await cursor.fetchone()
if row:
return dict(row)
return {"notify_email_summary": False, "notify_email_new_articles": False, "notify_email_status_change": False}
@router.put("/{incident_id}/subscription", response_model=SubscriptionResponse)
async def update_subscription(
incident_id: int,
data: SubscriptionUpdate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage setzen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
await db.execute(
"""INSERT INTO incident_subscriptions (user_id, incident_id, notify_email_summary, notify_email_new_articles, notify_email_status_change)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id, incident_id) DO UPDATE SET
notify_email_summary = excluded.notify_email_summary,
notify_email_new_articles = excluded.notify_email_new_articles,
notify_email_status_change = excluded.notify_email_status_change""",
(
current_user["id"],
incident_id,
1 if data.notify_email_summary else 0,
1 if data.notify_email_new_articles else 0,
1 if data.notify_email_status_change else 0,
),
)
await db.commit()
return {
"notify_email_summary": data.notify_email_summary,
"notify_email_new_articles": data.notify_email_new_articles,
"notify_email_status_change": data.notify_email_status_change,
}
@router.post("/{incident_id}/refresh")
async def trigger_refresh(
incident_id: int,
current_user: dict = Depends(require_writable_license),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Manuellen Refresh fuer eine Lage ausloesen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
from agents.orchestrator import orchestrator
enqueued = await orchestrator.enqueue_refresh(incident_id, user_id=current_user["id"])
if not enqueued:
return {"status": "skipped", "incident_id": incident_id}
return {"status": "queued", "incident_id": incident_id}
@router.post("/{incident_id}/cancel-refresh")
async def cancel_refresh(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Laufenden Refresh fuer eine Lage abbrechen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
from agents.orchestrator import orchestrator
cancelled = await orchestrator.cancel_refresh(incident_id)
return {"status": "cancelling" if cancelled else "not_running"}
def _slugify(text: str) -> str:
"""Dateinamen-sicherer Slug aus Titel."""
replacements = {
"\u00e4": "ae", "\u00f6": "oe", "\u00fc": "ue", "\u00df": "ss",
"\u00c4": "Ae", "\u00d6": "Oe", "\u00dc": "Ue",
}
for src, dst in replacements.items():
text = text.replace(src, dst)
text = unicodedata.normalize("NFKD", text)
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text).strip("-")
return text[:80].lower()
@router.get("/{incident_id}/export")
async def export_incident(
incident_id: int,
format: str = Query("pdf", pattern="^(pdf|docx)$"),
scope: str = Query("report", pattern="^(summary|report|full)$"),
sections: str = Query(None),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage als PDF oder Word exportieren."""
from report_generator import generate_pdf, generate_docx, generate_executive_summary
# Sections aus Komma-getrenntem String parsen
VALID_SECTIONS = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline", "karte"}
sections_set = None
if sections:
sections_set = {s.strip() for s in sections.split(",") if s.strip() in VALID_SECTIONS}
if not sections_set:
sections_set = None
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
incident = dict(row)
# Ersteller-Name
cursor = await db.execute("SELECT email FROM users WHERE id = ?", (incident["created_by"],))
user_row = await cursor.fetchone()
creator = user_row["email"] if user_row else "Unbekannt"
# Organisation (fuer Dateimetadaten)
organization_name = None
if incident.get("tenant_id"):
cursor = await db.execute(
"SELECT name FROM organizations WHERE id = ?", (incident["tenant_id"],)
)
org_row = await cursor.fetchone()
organization_name = org_row["name"] if org_row else None
# Top-Orte (fuer Keyword-Metadaten)
cursor = await db.execute(
"""SELECT location_name, COUNT(*) AS cnt
FROM article_locations
WHERE incident_id = ?
GROUP BY COALESCE(location_name_normalized, location_name)
ORDER BY cnt DESC
LIMIT 5""",
(incident_id,),
)
top_locations = [r["location_name"] for r in await cursor.fetchall() if r["location_name"]]
# Snapshot-Count (als xmpMM:VersionID im PDF)
cursor = await db.execute(
"SELECT COUNT(*) AS cnt FROM incident_snapshots WHERE incident_id = ?",
(incident_id,),
)
snapshot_count = (await cursor.fetchone())["cnt"] or 0
# Artikel
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
articles = [dict(r) for r in await cursor.fetchall()]
# Faktenchecks
cursor = await db.execute(
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
(incident_id,),
)
fact_checks = [dict(r) for r in await cursor.fetchall()]
# Snapshots (nur bei full)
snapshots = []
if scope == "full":
cursor = await db.execute(
"SELECT * FROM incident_snapshots WHERE incident_id = ? ORDER BY created_at DESC",
(incident_id,),
)
snapshots = [dict(r) for r in await cursor.fetchall()]
# Zusammenfassung fuer den Export:
# - Bei Adhoc-Lagen primaer "Neueste Entwicklungen" (latest_developments) als Markdown-Bullets,
# weil Live-Monitoring von Aktualitaet lebt.
# - Fallback (oder bei Research): Executive Summary (KI-generiert, gecacht).
is_adhoc = (incident.get("type") or "adhoc") != "research"
latest_dev = (incident.get("latest_developments") or "").strip()
exec_summary = None
if is_adhoc and latest_dev:
from report_generator import _markdown_to_html as _md_to_html
exec_summary = _md_to_html(latest_dev)
if not exec_summary:
exec_summary = incident.get("executive_summary")
if not exec_summary:
summary_text = incident.get("summary") or ""
exec_summary = await generate_executive_summary(summary_text)
await db.execute(
"UPDATE incidents SET executive_summary = ? WHERE id = ?",
(exec_summary, incident_id),
)
await db.commit()
date_str = datetime.now(TIMEZONE).strftime("%Y%m%d")
slug = _slugify(incident["title"])
scope_labels = {"summary": "zusammenfassung", "report": "lagebericht", "full": "vollstaendig"}
# Wenn sections explizit angegeben, passenden Label waehlen
if sections_set:
if sections_set == {"zusammenfassung"}:
scope_labels_key = "zusammenfassung"
elif "timeline" in sections_set:
scope_labels_key = "vollstaendig"
else:
scope_labels_key = "lagebericht"
else:
scope_labels_key = scope_labels.get(scope, "lagebericht")
if format == "pdf":
pdf_bytes = await generate_pdf(
incident, articles, fact_checks, snapshots, scope, creator, exec_summary,
sections=sections_set,
organization_name=organization_name,
top_locations=top_locations,
snapshot_count=snapshot_count,
)
filename = f"{slug}_{scope_labels_key}_{date_str}.pdf"
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
else:
docx_bytes = await generate_docx(
incident, articles, fact_checks, snapshots, scope, creator, exec_summary,
sections=sections_set,
organization_name=organization_name,
top_locations=top_locations,
snapshot_count=snapshot_count,
)
filename = f"{slug}_{scope_labels_key}_{date_str}.docx"
return StreamingResponse(
io.BytesIO(docx_bytes),
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)