Em-dashes und Umlaut-Umschreibungen aus den Pipeline-Aenderungen entfernt: Tooltip-Texte, HTML-Empty-State, JS-Kommentare, Count-Status-Platzhalter, Orchestrator-Kommentare und CSS-Kommentare. Anstelle von typografischen Gedankenstrichen werden jetzt Kommas oder Punkte gesetzt, "uebersprungen" -> "uebersprungen" mit echtem Umlaut, "laeuft" usw. analog. UI-Text "— Refresh starten" wird zu zwei Saetzen. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1222 Zeilen
47 KiB
Python
1222 Zeilen
47 KiB
Python
"""Incidents-Router: Lagen verwalten (Multi-Tenant)."""
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
|
|
from fastapi.responses import StreamingResponse
|
|
from models import IncidentCreate, IncidentUpdate, IncidentResponse, IncidentListItem, SubscriptionUpdate, SubscriptionResponse, DescriptionEnhanceRequest
|
|
from auth import get_current_user
|
|
from middleware.license_check import require_writable_license
|
|
from database import db_dependency, get_db
|
|
from datetime import datetime
|
|
from config import TIMEZONE
|
|
import asyncio
|
|
import aiosqlite
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import unicodedata
|
|
|
|
_geoparse_logger = logging.getLogger("osint.geoparse_bg")
|
|
|
|
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
|
|
|
|
INCIDENT_UPDATE_COLUMNS = {
|
|
"title", "description", "type", "status", "refresh_mode",
|
|
"refresh_interval", "refresh_start_time", "retention_days", "international_sources", "include_telegram", "visibility",
|
|
}
|
|
|
|
|
|
async def _check_incident_access(
|
|
db: aiosqlite.Connection, incident_id: int, user_id: int, tenant_id: int
|
|
) -> aiosqlite.Row:
|
|
"""Lage laden und Zugriff pruefen (Tenant + Sichtbarkeit)."""
|
|
cursor = await db.execute(
|
|
"SELECT * FROM incidents WHERE id = ? AND tenant_id = ?",
|
|
(incident_id, tenant_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
|
|
if row["visibility"] == "private" and row["created_by"] != user_id:
|
|
raise HTTPException(status_code=403, detail="Kein Zugriff auf private Lage")
|
|
return row
|
|
|
|
|
|
async def _enrich_incident(db: aiosqlite.Connection, row: aiosqlite.Row) -> dict:
|
|
"""Incident-Row mit Statistiken und Ersteller-Name anreichern."""
|
|
incident = dict(row)
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident["id"],),
|
|
)
|
|
article_count = (await cursor.fetchone())["cnt"]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident["id"],),
|
|
)
|
|
source_count = (await cursor.fetchone())["cnt"]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT email FROM users WHERE id = ?",
|
|
(incident["created_by"],),
|
|
)
|
|
user_row = await cursor.fetchone()
|
|
|
|
incident["article_count"] = article_count
|
|
incident["source_count"] = source_count
|
|
incident["created_by_username"] = user_row["email"] if user_row else "Unbekannt"
|
|
|
|
return incident
|
|
|
|
|
|
@router.get("", response_model=list[IncidentListItem])
|
|
async def list_incidents(
|
|
status_filter: str = None,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle Lagen des Tenants auflisten (oeffentliche + eigene private).
|
|
|
|
Liefert schlanke Sidebar-Items — ohne summary, description, sources_json.
|
|
Volltexte kommen erst beim Oeffnen der Lage per GET /incidents/{id}.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
user_id = current_user["id"]
|
|
|
|
# Nur die fuer Sidebar + Edit-Dialog noetigen Spalten selektieren
|
|
# (spart bei Iran: 324 KB sources_json + 32 KB summary).
|
|
# has_summary als Bit — Frontend nutzt es zur Erkennung "erster Refresh".
|
|
query = (
|
|
"SELECT id, title, description, type, status, refresh_mode, refresh_interval, "
|
|
"refresh_start_time, retention_days, visibility, "
|
|
"international_sources, include_telegram, created_by, created_at, updated_at, "
|
|
"CASE WHEN summary IS NOT NULL AND summary != '' THEN 1 ELSE 0 END AS has_summary "
|
|
"FROM incidents WHERE tenant_id = ? AND (visibility = 'public' OR created_by = ?)"
|
|
)
|
|
params = [tenant_id, user_id]
|
|
|
|
if status_filter:
|
|
query += " AND status = ?"
|
|
params.append(status_filter)
|
|
|
|
query += " ORDER BY updated_at DESC"
|
|
cursor = await db.execute(query, params)
|
|
rows = await cursor.fetchall()
|
|
|
|
results = []
|
|
for row in rows:
|
|
results.append(await _enrich_incident(db, row))
|
|
return results
|
|
|
|
|
|
@router.post("", response_model=IncidentResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_incident(
|
|
data: IncidentCreate,
|
|
current_user: dict = Depends(require_writable_license),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Neue Lage anlegen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor = await db.execute(
|
|
"""INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval,
|
|
refresh_start_time, retention_days, international_sources, include_telegram, visibility,
|
|
tenant_id, created_by, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
data.title,
|
|
data.description,
|
|
data.type,
|
|
data.refresh_mode,
|
|
data.refresh_interval,
|
|
data.refresh_start_time,
|
|
data.retention_days,
|
|
1 if data.international_sources else 0,
|
|
1 if data.include_telegram else 0,
|
|
data.visibility,
|
|
tenant_id,
|
|
current_user["id"],
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (cursor.lastrowid,))
|
|
row = await cursor.fetchone()
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.get("/refreshing")
|
|
async def get_refreshing_incidents(
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Gibt IDs aller Lagen mit laufendem Refresh zurueck (nur eigener Tenant)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
cursor = await db.execute(
|
|
"""SELECT rl.incident_id, rl.started_at FROM refresh_log rl
|
|
JOIN incidents i ON i.id = rl.incident_id
|
|
WHERE rl.status = 'running'
|
|
AND i.tenant_id = ?
|
|
AND (i.visibility = 'public' OR i.created_by = ?)""",
|
|
(tenant_id, current_user["id"]),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
|
|
# Also include queued incidents from orchestrator
|
|
from agents.orchestrator import orchestrator
|
|
queued_ids = list(orchestrator._queued_ids) if hasattr(orchestrator, '_queued_ids') else []
|
|
current_task = orchestrator._current_task if hasattr(orchestrator, '_current_task') else None
|
|
# Session-Start des aktuell laufenden Tasks — stabil ueber Multi-Pass/Retry hinweg.
|
|
# Verhindert, dass der Frontend-Timer beim Reload auf den letzten Log-Eintrag
|
|
# (pass 2/3 oder retry n) zurueckspringt.
|
|
current_started_at = (
|
|
orchestrator._current_task_started_at
|
|
if hasattr(orchestrator, '_current_task_started_at') else None
|
|
)
|
|
|
|
details = {}
|
|
for row in rows:
|
|
iid = row["incident_id"]
|
|
started_at = (
|
|
current_started_at
|
|
if (iid == current_task and current_started_at)
|
|
else row["started_at"]
|
|
)
|
|
details[str(iid)] = {"started_at": started_at}
|
|
|
|
return {
|
|
"refreshing": [row["incident_id"] for row in rows],
|
|
"queued": queued_ids,
|
|
"current": current_task,
|
|
"details": details,
|
|
}
|
|
|
|
|
|
# --- Beschreibung generieren (Prompt Enhancement) ---
|
|
|
|
ENHANCE_PROMPT_RESEARCH = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
|
|
Deine Aufgabe: Strukturiere ein Recherche-Briefing, das Analysten als Leitfaden für ihre Suche verwenden.
|
|
Du behauptest KEINE Fakten und musst das Thema NICHT kennen oder verifizieren.
|
|
Der Nutzer gibt das Thema vor -- du definierst Suchrichtungen, Schwerpunkte und Stichworte.
|
|
Erstelle das Briefing IMMER, auch wenn dir das Thema unbekannt ist.
|
|
|
|
WICHTIG: Verwende IMMER echte Umlaute (ä, ö, ü, ß) und KEINE Umschreibungen.
|
|
|
|
Titel: {title}
|
|
Vorhandener Kontext: {context}
|
|
Typ: Hintergrundrecherche
|
|
|
|
Erstelle ein präzises Recherche-Briefing mit:
|
|
1. Fallbezeichnung (vollständige Benennung des Themas basierend auf Titel und Kontext)
|
|
2. Recherche-Schwerpunkte (5-8 thematische Punkte, z.B. Sachverhalt, beteiligte Parteien, rechtliche Aspekte, mediale Rezeption, Hintergründe, Chronologie)
|
|
3. Relevante Suchbegriffe (deutsch + englisch, inkl. Abkürzungen und alternative Schreibweisen)
|
|
|
|
Schreibe NUR das Briefing als Fließtext mit Aufzählungen. Keine Erklärungen, Rückfragen oder Disclaimer."""
|
|
|
|
ENHANCE_PROMPT_ADHOC = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
|
|
Deine Aufgabe: Erstelle eine knappe Vorfallsbeschreibung, die als Suchauftrag für Live-Monitoring dient.
|
|
Du behauptest KEINE Fakten und musst den Vorfall NICHT kennen oder verifizieren.
|
|
Der Nutzer gibt das Thema vor -- du strukturierst, wonach gesucht werden soll.
|
|
Erstelle die Beschreibung IMMER, auch wenn dir der Vorfall unbekannt ist.
|
|
|
|
WICHTIG: Verwende IMMER echte Umlaute (ä, ö, ü, ß) und KEINE Umschreibungen.
|
|
|
|
Titel: {title}
|
|
Vorhandener Kontext: {context}
|
|
Typ: Live-Monitoring (aktuelle Ereignisse)
|
|
|
|
Erstelle eine knappe, informative Beschreibung mit:
|
|
1. Was ist passiert / worum geht es (basierend auf Titel und Kontext)
|
|
2. Wo (geographischer Kontext, falls ableitbar)
|
|
3. Wer ist beteiligt (Akteure, Organisationen, Länder)
|
|
4. Wonach soll gesucht werden (aktuelle Entwicklungen, Reaktionen, Hintergründe)
|
|
|
|
Schreibe NUR die Beschreibung als Fließtext (3-5 Zeilen). Keine Erklärungen, Rückfragen oder Disclaimer."""
|
|
|
|
_enhance_logger = logging.getLogger("osint.enhance")
|
|
|
|
|
|
@router.post("/enhance-description")
|
|
async def enhance_description(
|
|
data: DescriptionEnhanceRequest,
|
|
current_user: dict = Depends(require_writable_license),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Generiert eine strukturierte Beschreibung per KI aus dem Titel."""
|
|
from agents.claude_client import call_claude, ClaudeCliError
|
|
from config import CLAUDE_MODEL_FAST
|
|
from services.license_service import charge_usage_to_tenant
|
|
|
|
template = ENHANCE_PROMPT_RESEARCH if data.type == "research" else ENHANCE_PROMPT_ADHOC
|
|
context = data.description.strip() if data.description and data.description.strip() else "Kein Kontext angegeben"
|
|
prompt = template.format(title=data.title.strip(), context=context)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST, raw_text=True, timeout=60)
|
|
except ClaudeCliError as e:
|
|
_enhance_logger.error(f"Beschreibung generieren: ClaudeCliError [{e.error_type}]: {e.message}")
|
|
if e.error_type == "auth_error":
|
|
raise HTTPException(status_code=503, detail="KI-Zugang aktuell nicht verfuegbar. Bitte Administrator kontaktieren.")
|
|
if e.error_type == "rate_limit":
|
|
raise HTTPException(status_code=429, detail="KI ist gerade ausgelastet. Bitte in einer Minute erneut versuchen.")
|
|
raise HTTPException(status_code=500, detail="Beschreibung konnte nicht generiert werden")
|
|
except TimeoutError:
|
|
_enhance_logger.error("Beschreibung generieren: Timeout")
|
|
raise HTTPException(status_code=504, detail="Die KI antwortet gerade nicht. Bitte erneut versuchen.")
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
_enhance_logger.error(f"Beschreibung generieren fehlgeschlagen: {e}")
|
|
raise HTTPException(status_code=500, detail="Beschreibung konnte nicht generiert werden")
|
|
|
|
_enhance_logger.info(
|
|
f"Beschreibung generiert fuer \"{data.title[:50]}\": "
|
|
f"{usage.input_tokens}in/{usage.output_tokens}out"
|
|
)
|
|
await charge_usage_to_tenant(db, current_user.get("tenant_id"), usage, source="enhance")
|
|
await db.commit()
|
|
return {"description": result.strip()}
|
|
|
|
|
|
@router.get("/{incident_id}", response_model=IncidentResponse)
|
|
async def get_incident(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Einzelne Lage abrufen.
|
|
|
|
sources_json wird NICHT mitgeliefert — fuer Zitate-Lookups
|
|
stattdessen GET /incidents/{id}/sources verwenden (lazy).
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.get("/{incident_id}/sources")
|
|
async def get_incident_sources(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Sources-Array einer Lage (geparst aus sources_json) fuer Zitate-Lookups."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"SELECT sources_json FROM incidents WHERE id = ?",
|
|
(incident_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
sources: list = []
|
|
if row and row["sources_json"]:
|
|
try:
|
|
parsed = json.loads(row["sources_json"])
|
|
if isinstance(parsed, list):
|
|
sources = parsed
|
|
except (json.JSONDecodeError, TypeError):
|
|
sources = []
|
|
return {"incident_id": incident_id, "sources": sources}
|
|
|
|
|
|
@router.put("/{incident_id}", response_model=IncidentResponse)
|
|
async def update_incident(
|
|
incident_id: int,
|
|
data: IncidentUpdate,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage aktualisieren."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
updates = {}
|
|
for field, value in data.model_dump(exclude_none=True).items():
|
|
if field not in INCIDENT_UPDATE_COLUMNS:
|
|
continue
|
|
if field in ("international_sources", "include_telegram"):
|
|
updates[field] = 1 if value else 0
|
|
else:
|
|
updates[field] = value
|
|
|
|
if not updates:
|
|
return await _enrich_incident(db, row)
|
|
|
|
updates["updated_at"] = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [incident_id]
|
|
|
|
await db.execute(f"UPDATE incidents SET {set_clause} WHERE id = ?", values)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
|
|
row = await cursor.fetchone()
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.delete("/{incident_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_incident(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage loeschen (nur Ersteller)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
cursor = await db.execute(
|
|
"SELECT id, created_by FROM incidents WHERE id = ? AND tenant_id = ?",
|
|
(incident_id, tenant_id),
|
|
)
|
|
incident = await cursor.fetchone()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
|
|
|
|
if incident["created_by"] != current_user["id"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Nur der Ersteller kann diese Lage loeschen",
|
|
)
|
|
|
|
try:
|
|
await db.execute("DELETE FROM incidents WHERE id = ?", (incident_id,))
|
|
await db.commit()
|
|
except Exception as e:
|
|
if "database is locked" in str(e):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Datenbank ist momentan beschaeftigt. Bitte in wenigen Sekunden erneut versuchen.",
|
|
)
|
|
raise
|
|
|
|
|
|
@router.get("/{incident_id}/articles")
|
|
async def get_articles(
|
|
incident_id: int,
|
|
limit: int = Query(500, ge=1, le=1000),
|
|
offset: int = Query(0, ge=0),
|
|
search: str | None = Query(None, min_length=0, max_length=200),
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Artikel einer Lage paginiert abrufen.
|
|
|
|
Response: ``{"total": int, "articles": [...]}``.
|
|
Optionaler ``search``-Param filtert per LIKE ueber
|
|
headline, headline_de, source, content_de, content_original.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
search_clean = (search or "").strip()
|
|
if search_clean:
|
|
like = f"%{search_clean}%"
|
|
params = (incident_id, like, like, like, like, like)
|
|
where = (
|
|
"WHERE incident_id = ? AND ("
|
|
"COALESCE(headline,'') LIKE ? OR "
|
|
"COALESCE(headline_de,'') LIKE ? OR "
|
|
"COALESCE(source,'') LIKE ? OR "
|
|
"COALESCE(content_de,'') LIKE ? OR "
|
|
"COALESCE(content_original,'') LIKE ?)"
|
|
)
|
|
else:
|
|
params = (incident_id,)
|
|
where = "WHERE incident_id = ?"
|
|
|
|
cursor = await db.execute(f"SELECT COUNT(*) AS cnt FROM articles {where}", params)
|
|
total = (await cursor.fetchone())["cnt"]
|
|
|
|
cursor = await db.execute(
|
|
f"SELECT * FROM articles {where} ORDER BY collected_at DESC LIMIT ? OFFSET ?",
|
|
(*params, limit, offset),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return {"total": total, "articles": [dict(row) for row in rows]}
|
|
|
|
|
|
@router.get("/{incident_id}/articles/sources-summary")
|
|
async def get_articles_sources_summary(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Aggregierte Quellen-Statistik fuer eine Lage (fuer Quellenuebersicht)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT source,
|
|
COUNT(*) AS article_count,
|
|
GROUP_CONCAT(DISTINCT COALESCE(language,'de')) AS languages
|
|
FROM articles WHERE incident_id = ?
|
|
GROUP BY source ORDER BY article_count DESC""",
|
|
(incident_id,),
|
|
)
|
|
sources = []
|
|
for r in await cursor.fetchall():
|
|
d = dict(r)
|
|
langs = (d.pop("languages") or "de").split(",")
|
|
d["languages"] = sorted({(l or "de").strip() for l in langs if l is not None})
|
|
sources.append(d)
|
|
# Sprach-Verteilung gesamt
|
|
cursor = await db.execute(
|
|
"""SELECT COALESCE(language,'de') AS language, COUNT(*) AS cnt
|
|
FROM articles WHERE incident_id = ?
|
|
GROUP BY language ORDER BY cnt DESC""",
|
|
(incident_id,),
|
|
)
|
|
lang_counts = [dict(r) for r in await cursor.fetchall()]
|
|
total_cursor = await db.execute(
|
|
"SELECT COUNT(*) AS cnt FROM articles WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
total = (await total_cursor.fetchone())["cnt"]
|
|
return {"total": total, "sources": sources, "language_counts": lang_counts}
|
|
|
|
|
|
@router.get("/{incident_id}/articles/timeline-buckets")
|
|
async def get_articles_timeline_buckets(
|
|
incident_id: int,
|
|
granularity: str = Query("day", pattern="^(hour|day|week|month)$"),
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Aggregierte Zeit-Buckets fuer die Timeline-Achse.
|
|
|
|
Zaehlt Artikel und Snapshots pro Bucket. Kein Inhalt, nur Counts.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
fmt_map = {
|
|
"hour": "%Y-%m-%d %H:00",
|
|
"day": "%Y-%m-%d",
|
|
"week": "%Y-%W",
|
|
"month": "%Y-%m",
|
|
}
|
|
fmt = fmt_map[granularity]
|
|
cursor = await db.execute(
|
|
f"""SELECT strftime(?, collected_at) AS bucket, COUNT(*) AS article_count
|
|
FROM articles WHERE incident_id = ?
|
|
GROUP BY bucket ORDER BY bucket""",
|
|
(fmt, incident_id),
|
|
)
|
|
article_rows = {r["bucket"]: r["article_count"] for r in await cursor.fetchall()}
|
|
cursor = await db.execute(
|
|
f"""SELECT strftime(?, created_at) AS bucket, COUNT(*) AS snapshot_count
|
|
FROM incident_snapshots WHERE incident_id = ?
|
|
GROUP BY bucket ORDER BY bucket""",
|
|
(fmt, incident_id),
|
|
)
|
|
snapshot_rows = {r["bucket"]: r["snapshot_count"] for r in await cursor.fetchall()}
|
|
all_buckets = sorted(set(article_rows.keys()) | set(snapshot_rows.keys()))
|
|
return {
|
|
"granularity": granularity,
|
|
"buckets": [
|
|
{
|
|
"bucket": b,
|
|
"article_count": article_rows.get(b, 0),
|
|
"snapshot_count": snapshot_rows.get(b, 0),
|
|
}
|
|
for b in all_buckets
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/{incident_id}/snapshots")
|
|
async def get_snapshots(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lageberichte (Snapshots) einer Lage abrufen — schlanke Liste.
|
|
|
|
Liefert nur Metadaten und einen 300-Zeichen-Preview des Summary.
|
|
Der Volltext (summary + sources_json) wird per Einzel-Endpunkt
|
|
``GET /{incident_id}/snapshots/{snapshot_id}`` bei Bedarf geladen.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
|
|
SUBSTR(summary, 1, 300) AS summary_preview
|
|
FROM incident_snapshots WHERE incident_id = ?
|
|
ORDER BY created_at DESC""",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/snapshots/search")
|
|
async def search_snapshots(
|
|
incident_id: int,
|
|
q: str = Query(..., min_length=2, max_length=200),
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Volltextsuche über alle Snapshots einer Lage.
|
|
|
|
Liefert dieselbe schlanke Shape wie der Listen-Endpunkt,
|
|
gefiltert per ``summary LIKE '%q%'``.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
like = f"%{q}%"
|
|
cursor = await db.execute(
|
|
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
|
|
SUBSTR(summary, 1, 300) AS summary_preview
|
|
FROM incident_snapshots
|
|
WHERE incident_id = ? AND summary LIKE ?
|
|
ORDER BY created_at DESC""",
|
|
(incident_id, like),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/snapshots/{snapshot_id}")
|
|
async def get_snapshot(
|
|
incident_id: int,
|
|
snapshot_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Einzelnen Snapshot mit vollem Summary + sources_json abrufen (Lazy-Load)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT id, incident_id, summary, sources_json,
|
|
article_count, fact_check_count, created_at
|
|
FROM incident_snapshots WHERE id = ? AND incident_id = ?""",
|
|
(snapshot_id, incident_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Snapshot nicht gefunden")
|
|
return dict(row)
|
|
|
|
|
|
@router.get("/{incident_id}/factchecks")
|
|
async def get_factchecks(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Faktenchecks einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/pipeline")
|
|
async def get_pipeline(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Analysepipeline-Status der Lage: Definition aller Schritte + Stand des
|
|
letzten (oder gerade laufenden) Refreshs.
|
|
|
|
Antwort:
|
|
{
|
|
"is_research": bool,
|
|
"is_running": bool,
|
|
"last_refresh": {started_at, completed_at, duration_sec, status, pass_total} | null,
|
|
"steps_definition": [{key, label, icon, tooltip}, ...],
|
|
"steps": [{step_key, status, count_value, count_secondary, pass_number}, ...]
|
|
}
|
|
"""
|
|
from services.pipeline_tracker import PIPELINE_STEPS
|
|
|
|
tenant_id = current_user.get("tenant_id")
|
|
incident_row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
is_research = (incident_row["type"] or "adhoc") == "research"
|
|
|
|
# Jüngsten Refresh-Log wählen: bevorzugt running, sonst der letzte completed
|
|
cursor = await db.execute(
|
|
"""SELECT id, started_at, completed_at, status, retry_count
|
|
FROM refresh_log
|
|
WHERE incident_id = ? AND status = 'running'
|
|
ORDER BY started_at DESC LIMIT 1""",
|
|
(incident_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
cursor = await db.execute(
|
|
"""SELECT id, started_at, completed_at, status, retry_count
|
|
FROM refresh_log
|
|
WHERE incident_id = ?
|
|
ORDER BY started_at DESC LIMIT 1""",
|
|
(incident_id,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
last_refresh = None
|
|
steps = []
|
|
is_running = False
|
|
if row:
|
|
is_running = row["status"] == "running"
|
|
# Pipeline-Steps zu diesem Refresh laden
|
|
sc = await db.execute(
|
|
"""SELECT step_key, pass_number, status, count_value, count_secondary,
|
|
started_at, completed_at
|
|
FROM refresh_pipeline_steps
|
|
WHERE refresh_log_id = ?
|
|
ORDER BY pass_number ASC, id ASC""",
|
|
(row["id"],),
|
|
)
|
|
steps = [dict(r) for r in await sc.fetchall()]
|
|
|
|
# Pass-Total: bei Research-Lagen mit Multi-Pass-Daten ermitteln
|
|
max_pass = 1
|
|
for s in steps:
|
|
if s["pass_number"] and s["pass_number"] > max_pass:
|
|
max_pass = s["pass_number"]
|
|
|
|
# Dauer berechnen (nur wenn completed)
|
|
duration_sec = None
|
|
try:
|
|
if row["started_at"] and row["completed_at"]:
|
|
t0 = datetime.strptime(row["started_at"], "%Y-%m-%d %H:%M:%S")
|
|
t1 = datetime.strptime(row["completed_at"], "%Y-%m-%d %H:%M:%S")
|
|
duration_sec = max(0, int((t1 - t0).total_seconds()))
|
|
except Exception:
|
|
duration_sec = None
|
|
|
|
last_refresh = {
|
|
"started_at": row["started_at"],
|
|
"completed_at": row["completed_at"],
|
|
"status": row["status"],
|
|
"duration_sec": duration_sec,
|
|
"pass_total": max_pass,
|
|
}
|
|
|
|
return {
|
|
"is_research": is_research,
|
|
"is_running": is_running,
|
|
"last_refresh": last_refresh,
|
|
"steps_definition": PIPELINE_STEPS,
|
|
"steps": steps,
|
|
}
|
|
|
|
|
|
@router.get("/{incident_id}/locations")
|
|
async def get_locations(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Geografische Orte einer Lage abrufen (serverseitig aggregiert nach Ort).
|
|
|
|
Drei getrennte Queries (alle klein) statt eines 21k-Zeilen-JOINs:
|
|
1. Orte-Aggregate per GROUP BY (name, lat, lon) — liefert direkt ~Ergebnismenge.
|
|
2. Kategorien pro Ort per GROUP BY (name, lat, lon, category) — fuer dominante Kategorie.
|
|
3. Sample-Artikel pro Ort via ROW_NUMBER() — max. 10 pro Ort.
|
|
"""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
# 1. Orte-Aggregate
|
|
cursor = await db.execute(
|
|
"""SELECT
|
|
COALESCE(location_name_normalized, location_name) AS name,
|
|
ROUND(latitude, 2) AS lat,
|
|
ROUND(longitude, 2) AS lon,
|
|
MIN(country_code) AS country_code,
|
|
MAX(confidence) AS confidence,
|
|
COUNT(*) AS article_count
|
|
FROM article_locations
|
|
WHERE incident_id = ?
|
|
GROUP BY name, lat, lon
|
|
ORDER BY article_count DESC""",
|
|
(incident_id,),
|
|
)
|
|
loc_rows = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# 2. Kategorien pro Ort
|
|
cursor = await db.execute(
|
|
"""SELECT
|
|
COALESCE(location_name_normalized, location_name) AS name,
|
|
ROUND(latitude, 2) AS lat,
|
|
ROUND(longitude, 2) AS lon,
|
|
COALESCE(category, 'mentioned') AS category,
|
|
COUNT(*) AS cnt
|
|
FROM article_locations
|
|
WHERE incident_id = ?
|
|
GROUP BY name, lat, lon, category""",
|
|
(incident_id,),
|
|
)
|
|
cat_map: dict[tuple, dict[str, int]] = {}
|
|
for r in await cursor.fetchall():
|
|
key = (r["name"], r["lat"], r["lon"])
|
|
cat_map.setdefault(key, {})[r["category"]] = r["cnt"]
|
|
|
|
# 3. Sample-Artikel pro Ort (max. 10, neueste zuerst)
|
|
cursor = await db.execute(
|
|
"""SELECT name, lat, lon, article_id, headline, headline_de, source, source_url
|
|
FROM (
|
|
SELECT
|
|
COALESCE(al.location_name_normalized, al.location_name) AS name,
|
|
ROUND(al.latitude, 2) AS lat,
|
|
ROUND(al.longitude, 2) AS lon,
|
|
a.id AS article_id,
|
|
a.headline, a.headline_de, a.source, a.source_url,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY COALESCE(al.location_name_normalized, al.location_name),
|
|
ROUND(al.latitude, 2), ROUND(al.longitude, 2)
|
|
ORDER BY a.collected_at DESC
|
|
) AS rn
|
|
FROM article_locations al
|
|
JOIN articles a ON a.id = al.article_id
|
|
WHERE al.incident_id = ?
|
|
)
|
|
WHERE rn <= 10""",
|
|
(incident_id,),
|
|
)
|
|
sample_map: dict[tuple, list[dict]] = {}
|
|
for r in await cursor.fetchall():
|
|
key = (r["name"], r["lat"], r["lon"])
|
|
sample_map.setdefault(key, []).append({
|
|
"id": r["article_id"],
|
|
"headline": r["headline_de"] or r["headline"],
|
|
"source": r["source"],
|
|
"source_url": r["source_url"],
|
|
})
|
|
|
|
# Zusammensetzen
|
|
priority = {"primary": 4, "secondary": 3, "tertiary": 2, "mentioned": 1}
|
|
result = []
|
|
for loc in loc_rows:
|
|
key = (loc["name"], loc["lat"], loc["lon"])
|
|
cats = cat_map.get(key, {})
|
|
best_cat = max(cats, key=lambda c: (priority.get(c, 0), cats[c])) if cats else "mentioned"
|
|
result.append({
|
|
"location_name": loc["name"],
|
|
"lat": loc["lat"],
|
|
"lon": loc["lon"],
|
|
"country_code": loc["country_code"],
|
|
"confidence": loc["confidence"],
|
|
"article_count": loc["article_count"],
|
|
"articles": sample_map.get(key, []),
|
|
"category": best_cat,
|
|
})
|
|
|
|
# Category-Labels aus Incident laden
|
|
cursor = await db.execute(
|
|
"SELECT category_labels FROM incidents WHERE id = ?", (incident_id,)
|
|
)
|
|
inc_row = await cursor.fetchone()
|
|
category_labels = None
|
|
if inc_row and inc_row["category_labels"]:
|
|
try:
|
|
category_labels = json.loads(inc_row["category_labels"])
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
return {"category_labels": category_labels, "locations": result}
|
|
|
|
|
|
# Geoparse-Status pro Incident (in-memory)
|
|
_geoparse_status: dict[int, dict] = {}
|
|
|
|
|
|
async def _run_geoparse_background(incident_id: int, tenant_id: int | None):
|
|
"""Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage."""
|
|
_geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0}
|
|
db = None
|
|
try:
|
|
from agents.geoparsing import geoparse_articles
|
|
db = await get_db()
|
|
|
|
# Incident-Kontext fuer Haiku laden
|
|
cursor = await db.execute(
|
|
"SELECT title, description FROM incidents WHERE id = ?", (incident_id,)
|
|
)
|
|
inc_row = await cursor.fetchone()
|
|
incident_context = ""
|
|
if inc_row:
|
|
incident_context = f"{inc_row['title']} - {inc_row['description'] or ''}"
|
|
|
|
cursor = await db.execute(
|
|
"""SELECT a.* FROM articles a
|
|
WHERE a.incident_id = ?
|
|
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
|
(incident_id, incident_id),
|
|
)
|
|
articles = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
if not articles:
|
|
_geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0}
|
|
return
|
|
|
|
total = len(articles)
|
|
_geoparse_status[incident_id]["total"] = total
|
|
_geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}")
|
|
|
|
# In Batches verarbeiten (50 Artikel pro Batch)
|
|
batch_size = 50
|
|
geo_count = 0
|
|
processed = 0
|
|
for i in range(0, total, batch_size):
|
|
batch = articles[i:i + batch_size]
|
|
geo_result = await geoparse_articles(batch, incident_context)
|
|
# Tuple-Rückgabe: (locations_dict, category_labels)
|
|
if isinstance(geo_result, tuple):
|
|
batch_geo_results, batch_labels = geo_result
|
|
# Labels beim ersten Batch speichern
|
|
if batch_labels and i == 0:
|
|
try:
|
|
await db.execute(
|
|
"UPDATE incidents SET category_labels = ? WHERE id = ? AND category_labels IS NULL",
|
|
(json.dumps(batch_labels, ensure_ascii=False), incident_id),
|
|
)
|
|
await db.commit()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
batch_geo_results = geo_result
|
|
for art_id, locations in batch_geo_results.items():
|
|
for loc in locations:
|
|
await db.execute(
|
|
"""INSERT INTO article_locations
|
|
(article_id, incident_id, location_name, location_name_normalized,
|
|
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
|
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
|
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
|
|
)
|
|
geo_count += 1
|
|
await db.commit()
|
|
processed += len(batch)
|
|
_geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count}
|
|
|
|
_geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count}
|
|
_geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})")
|
|
except Exception as e:
|
|
_geoparse_status[incident_id] = {"status": "error", "error": str(e)}
|
|
_geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}")
|
|
finally:
|
|
if db:
|
|
await db.close()
|
|
|
|
|
|
@router.post("/{incident_id}/geoparse")
|
|
async def trigger_geoparse(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
# Bereits laufend?
|
|
existing = _geoparse_status.get(incident_id, {})
|
|
if existing.get("status") == "running":
|
|
return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"}
|
|
|
|
# Pruefen ob es ueberhaupt ungeparsete Artikel gibt
|
|
cursor = await db.execute(
|
|
"""SELECT COUNT(*) as cnt FROM articles a
|
|
WHERE a.incident_id = ?
|
|
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
|
(incident_id, incident_id),
|
|
)
|
|
count = (await cursor.fetchone())["cnt"]
|
|
if count == 0:
|
|
return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0}
|
|
|
|
# Hintergrund-Task starten
|
|
asyncio.create_task(_run_geoparse_background(incident_id, tenant_id))
|
|
return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"}
|
|
|
|
|
|
@router.get("/{incident_id}/geoparse-status")
|
|
async def get_geoparse_status(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Status des laufenden Geoparsing-Tasks abfragen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
return _geoparse_status.get(incident_id, {"status": "idle"})
|
|
|
|
|
|
@router.get("/{incident_id}/refresh-log")
|
|
async def get_refresh_log(
|
|
incident_id: int,
|
|
limit: int = 20,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Refresh-Verlauf einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT id, started_at, completed_at, articles_found, status,
|
|
trigger_type, retry_count, error_message
|
|
FROM refresh_log WHERE incident_id = ?
|
|
ORDER BY started_at DESC LIMIT ?""",
|
|
(incident_id, min(limit, 100)),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
entry = dict(row)
|
|
if entry["started_at"] and entry["completed_at"]:
|
|
try:
|
|
start = datetime.fromisoformat(entry["started_at"])
|
|
end = datetime.fromisoformat(entry["completed_at"])
|
|
entry["duration_seconds"] = round((end - start).total_seconds(), 1)
|
|
except Exception:
|
|
entry["duration_seconds"] = None
|
|
else:
|
|
entry["duration_seconds"] = None
|
|
results.append(entry)
|
|
return results
|
|
|
|
|
|
@router.get("/{incident_id}/subscription", response_model=SubscriptionResponse)
|
|
async def get_subscription(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT notify_email_summary, notify_email_new_articles, notify_email_status_change
|
|
FROM incident_subscriptions WHERE user_id = ? AND incident_id = ?""",
|
|
(current_user["id"], incident_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
return dict(row)
|
|
return {"notify_email_summary": False, "notify_email_new_articles": False, "notify_email_status_change": False}
|
|
|
|
|
|
@router.put("/{incident_id}/subscription", response_model=SubscriptionResponse)
|
|
async def update_subscription(
|
|
incident_id: int,
|
|
data: SubscriptionUpdate,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage setzen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
await db.execute(
|
|
"""INSERT INTO incident_subscriptions (user_id, incident_id, notify_email_summary, notify_email_new_articles, notify_email_status_change)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(user_id, incident_id) DO UPDATE SET
|
|
notify_email_summary = excluded.notify_email_summary,
|
|
notify_email_new_articles = excluded.notify_email_new_articles,
|
|
notify_email_status_change = excluded.notify_email_status_change""",
|
|
(
|
|
current_user["id"],
|
|
incident_id,
|
|
1 if data.notify_email_summary else 0,
|
|
1 if data.notify_email_new_articles else 0,
|
|
1 if data.notify_email_status_change else 0,
|
|
),
|
|
)
|
|
await db.commit()
|
|
return {
|
|
"notify_email_summary": data.notify_email_summary,
|
|
"notify_email_new_articles": data.notify_email_new_articles,
|
|
"notify_email_status_change": data.notify_email_status_change,
|
|
}
|
|
|
|
|
|
@router.post("/{incident_id}/refresh")
|
|
async def trigger_refresh(
|
|
incident_id: int,
|
|
current_user: dict = Depends(require_writable_license),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Manuellen Refresh fuer eine Lage ausloesen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
from agents.orchestrator import orchestrator
|
|
enqueued = await orchestrator.enqueue_refresh(incident_id, user_id=current_user["id"])
|
|
|
|
if not enqueued:
|
|
return {"status": "skipped", "incident_id": incident_id}
|
|
return {"status": "queued", "incident_id": incident_id}
|
|
|
|
|
|
@router.post("/{incident_id}/cancel-refresh")
|
|
async def cancel_refresh(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Laufenden Refresh fuer eine Lage abbrechen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
from agents.orchestrator import orchestrator
|
|
cancelled = await orchestrator.cancel_refresh(incident_id)
|
|
|
|
return {"status": "cancelling" if cancelled else "not_running"}
|
|
|
|
|
|
|
|
def _slugify(text: str) -> str:
|
|
"""Dateinamen-sicherer Slug aus Titel."""
|
|
replacements = {
|
|
"\u00e4": "ae", "\u00f6": "oe", "\u00fc": "ue", "\u00df": "ss",
|
|
"\u00c4": "Ae", "\u00d6": "Oe", "\u00dc": "Ue",
|
|
}
|
|
for src, dst in replacements.items():
|
|
text = text.replace(src, dst)
|
|
text = unicodedata.normalize("NFKD", text)
|
|
text = re.sub(r"[^\w\s-]", "", text)
|
|
text = re.sub(r"[\s_]+", "-", text).strip("-")
|
|
return text[:80].lower()
|
|
|
|
|
|
@router.get("/{incident_id}/export")
|
|
async def export_incident(
|
|
incident_id: int,
|
|
format: str = Query("pdf", pattern="^(pdf|docx)$"),
|
|
scope: str = Query("report", pattern="^(summary|report|full)$"),
|
|
sections: str = Query(None),
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage als PDF oder Word exportieren."""
|
|
from report_generator import generate_pdf, generate_docx, generate_executive_summary
|
|
|
|
# Sections aus Komma-getrenntem String parsen
|
|
VALID_SECTIONS = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline", "karte"}
|
|
sections_set = None
|
|
if sections:
|
|
sections_set = {s.strip() for s in sections.split(",") if s.strip() in VALID_SECTIONS}
|
|
if not sections_set:
|
|
sections_set = None
|
|
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
incident = dict(row)
|
|
|
|
# Ersteller-Name
|
|
cursor = await db.execute("SELECT email FROM users WHERE id = ?", (incident["created_by"],))
|
|
user_row = await cursor.fetchone()
|
|
creator = user_row["email"] if user_row else "Unbekannt"
|
|
|
|
# Organisation (fuer Dateimetadaten)
|
|
organization_name = None
|
|
if incident.get("tenant_id"):
|
|
cursor = await db.execute(
|
|
"SELECT name FROM organizations WHERE id = ?", (incident["tenant_id"],)
|
|
)
|
|
org_row = await cursor.fetchone()
|
|
organization_name = org_row["name"] if org_row else None
|
|
|
|
# Top-Orte (fuer Keyword-Metadaten)
|
|
cursor = await db.execute(
|
|
"""SELECT location_name, COUNT(*) AS cnt
|
|
FROM article_locations
|
|
WHERE incident_id = ?
|
|
GROUP BY COALESCE(location_name_normalized, location_name)
|
|
ORDER BY cnt DESC
|
|
LIMIT 5""",
|
|
(incident_id,),
|
|
)
|
|
top_locations = [r["location_name"] for r in await cursor.fetchall() if r["location_name"]]
|
|
|
|
# Snapshot-Count (als xmpMM:VersionID im PDF)
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) AS cnt FROM incident_snapshots WHERE incident_id = ?",
|
|
(incident_id,),
|
|
)
|
|
snapshot_count = (await cursor.fetchone())["cnt"] or 0
|
|
|
|
# Artikel
|
|
cursor = await db.execute(
|
|
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
|
|
(incident_id,),
|
|
)
|
|
articles = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Faktenchecks
|
|
cursor = await db.execute(
|
|
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
|
|
(incident_id,),
|
|
)
|
|
fact_checks = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Snapshots (nur bei full)
|
|
snapshots = []
|
|
if scope == "full":
|
|
cursor = await db.execute(
|
|
"SELECT * FROM incident_snapshots WHERE incident_id = ? ORDER BY created_at DESC",
|
|
(incident_id,),
|
|
)
|
|
snapshots = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Executive Summary (KI-generiert, gecacht)
|
|
exec_summary = incident.get("executive_summary")
|
|
if not exec_summary:
|
|
summary_text = incident.get("summary") or ""
|
|
exec_summary = await generate_executive_summary(summary_text)
|
|
await db.execute(
|
|
"UPDATE incidents SET executive_summary = ? WHERE id = ?",
|
|
(exec_summary, incident_id),
|
|
)
|
|
await db.commit()
|
|
|
|
date_str = datetime.now(TIMEZONE).strftime("%Y%m%d")
|
|
slug = _slugify(incident["title"])
|
|
scope_labels = {"summary": "zusammenfassung", "report": "lagebericht", "full": "vollstaendig"}
|
|
# Wenn sections explizit angegeben, passenden Label waehlen
|
|
if sections_set:
|
|
if sections_set == {"zusammenfassung"}:
|
|
scope_labels_key = "zusammenfassung"
|
|
elif "timeline" in sections_set:
|
|
scope_labels_key = "vollstaendig"
|
|
else:
|
|
scope_labels_key = "lagebericht"
|
|
else:
|
|
scope_labels_key = scope_labels.get(scope, "lagebericht")
|
|
|
|
if format == "pdf":
|
|
pdf_bytes = await generate_pdf(
|
|
incident, articles, fact_checks, snapshots, scope, creator, exec_summary,
|
|
sections=sections_set,
|
|
organization_name=organization_name,
|
|
top_locations=top_locations,
|
|
snapshot_count=snapshot_count,
|
|
)
|
|
filename = f"{slug}_{scope_labels_key}_{date_str}.pdf"
|
|
return StreamingResponse(
|
|
io.BytesIO(pdf_bytes),
|
|
media_type="application/pdf",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
else:
|
|
docx_bytes = await generate_docx(
|
|
incident, articles, fact_checks, snapshots, scope, creator, exec_summary,
|
|
sections=sections_set,
|
|
organization_name=organization_name,
|
|
top_locations=top_locations,
|
|
snapshot_count=snapshot_count,
|
|
)
|
|
filename = f"{slug}_{scope_labels_key}_{date_str}.docx"
|
|
return StreamingResponse(
|
|
io.BytesIO(docx_bytes),
|
|
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|