Dateien
AegisSight-Monitor/src/routers/incidents.py
UserIsMH a302790777 Locations: Aggregation in SQL (GROUP BY + Window)
Ersetzt den rohen JOIN ueber article_locations x articles (bei Iran
21.814 Zeilen, 11 MB Payload) durch drei kleine aggregierte Queries:
  1. Orte per GROUP BY (name, lat, lon) — direkt die Ergebnismenge.
  2. Kategorien pro Ort per GROUP BY fuer die dominante Kategorie.
  3. Sample-Artikel (max. 10 pro Ort) via ROW_NUMBER() OVER PARTITION BY.

Response-Shape unveraendert ({category_labels, locations: [...]}), keine
Frontend-Aenderung noetig. Priorisierung primary > secondary > tertiary >
mentioned bleibt erhalten.

Erwarteter Effekt: Iran-Locations 11 MB -> <500 KB; Query-Zeit sinkt
zusaetzlich, da kein 21k-Zeilen-JOIN mehr materialisiert werden muss.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 23:47:50 +02:00

1014 Zeilen
39 KiB
Python

"""Incidents-Router: Lagen verwalten (Multi-Tenant)."""
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
from fastapi.responses import StreamingResponse
from models import IncidentCreate, IncidentUpdate, IncidentResponse, SubscriptionUpdate, SubscriptionResponse, DescriptionEnhanceRequest
from auth import get_current_user
from middleware.license_check import require_writable_license
from database import db_dependency, get_db
from datetime import datetime
from config import TIMEZONE
import asyncio
import aiosqlite
import io
import json
import logging
import re
import unicodedata
_geoparse_logger = logging.getLogger("osint.geoparse_bg")
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
INCIDENT_UPDATE_COLUMNS = {
"title", "description", "type", "status", "refresh_mode",
"refresh_interval", "refresh_start_time", "retention_days", "international_sources", "include_telegram", "visibility",
}
async def _check_incident_access(
db: aiosqlite.Connection, incident_id: int, user_id: int, tenant_id: int
) -> aiosqlite.Row:
"""Lage laden und Zugriff pruefen (Tenant + Sichtbarkeit)."""
cursor = await db.execute(
"SELECT * FROM incidents WHERE id = ? AND tenant_id = ?",
(incident_id, tenant_id),
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
if row["visibility"] == "private" and row["created_by"] != user_id:
raise HTTPException(status_code=403, detail="Kein Zugriff auf private Lage")
return row
async def _enrich_incident(db: aiosqlite.Connection, row: aiosqlite.Row) -> dict:
"""Incident-Row mit Statistiken und Ersteller-Name anreichern."""
incident = dict(row)
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident["id"],),
)
article_count = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id = ?",
(incident["id"],),
)
source_count = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT email FROM users WHERE id = ?",
(incident["created_by"],),
)
user_row = await cursor.fetchone()
incident["article_count"] = article_count
incident["source_count"] = source_count
incident["created_by_username"] = user_row["email"] if user_row else "Unbekannt"
return incident
@router.get("", response_model=list[IncidentResponse])
async def list_incidents(
status_filter: str = None,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Lagen des Tenants auflisten (oeffentliche + eigene private)."""
tenant_id = current_user.get("tenant_id")
user_id = current_user["id"]
query = "SELECT * FROM incidents WHERE tenant_id = ? AND (visibility = 'public' OR created_by = ?)"
params = [tenant_id, user_id]
if status_filter:
query += " AND status = ?"
params.append(status_filter)
query += " ORDER BY updated_at DESC"
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
results = []
for row in rows:
results.append(await _enrich_incident(db, row))
return results
@router.post("", response_model=IncidentResponse, status_code=status.HTTP_201_CREATED)
async def create_incident(
data: IncidentCreate,
current_user: dict = Depends(require_writable_license),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Neue Lage anlegen."""
tenant_id = current_user.get("tenant_id")
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
cursor = await db.execute(
"""INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval,
refresh_start_time, retention_days, international_sources, include_telegram, visibility,
tenant_id, created_by, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
data.title,
data.description,
data.type,
data.refresh_mode,
data.refresh_interval,
data.refresh_start_time,
data.retention_days,
1 if data.international_sources else 0,
1 if data.include_telegram else 0,
data.visibility,
tenant_id,
current_user["id"],
now,
now,
),
)
await db.commit()
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (cursor.lastrowid,))
row = await cursor.fetchone()
return await _enrich_incident(db, row)
@router.get("/refreshing")
async def get_refreshing_incidents(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Gibt IDs aller Lagen mit laufendem Refresh zurueck (nur eigener Tenant)."""
tenant_id = current_user.get("tenant_id")
cursor = await db.execute(
"""SELECT rl.incident_id, rl.started_at FROM refresh_log rl
JOIN incidents i ON i.id = rl.incident_id
WHERE rl.status = 'running'
AND i.tenant_id = ?
AND (i.visibility = 'public' OR i.created_by = ?)""",
(tenant_id, current_user["id"]),
)
rows = await cursor.fetchall()
# Also include queued incidents from orchestrator
from agents.orchestrator import orchestrator
queued_ids = list(orchestrator._queued_ids) if hasattr(orchestrator, '_queued_ids') else []
current_task = orchestrator._current_task if hasattr(orchestrator, '_current_task') else None
return {
"refreshing": [row["incident_id"] for row in rows],
"queued": queued_ids,
"current": current_task,
"details": {str(row["incident_id"]): {"started_at": row["started_at"]} for row in rows},
}
# --- Beschreibung generieren (Prompt Enhancement) ---
ENHANCE_PROMPT_RESEARCH = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
Deine Aufgabe: Strukturiere ein Recherche-Briefing, das Analysten als Leitfaden fuer ihre Suche verwenden.
Du behauptest KEINE Fakten und musst das Thema NICHT kennen oder verifizieren.
Der Nutzer gibt das Thema vor -- du definierst Suchrichtungen, Schwerpunkte und Stichworte.
Erstelle das Briefing IMMER, auch wenn dir das Thema unbekannt ist.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ae, oe, ue, ss) und KEINE Umschreibungen.
Titel: {title}
Vorhandener Kontext: {context}
Typ: Hintergrundrecherche
Erstelle ein praezises Recherche-Briefing mit:
1. Fallbezeichnung (vollstaendige Benennung des Themas basierend auf Titel und Kontext)
2. Recherche-Schwerpunkte (5-8 thematische Punkte, z.B. Sachverhalt, beteiligte Parteien, rechtliche Aspekte, mediale Rezeption, Hintergruende, Chronologie)
3. Relevante Suchbegriffe (deutsch + englisch, inkl. Abkuerzungen und alternative Schreibweisen)
Schreibe NUR das Briefing als Fliesstext mit Aufzaehlungen. Keine Erklaerungen, Rueckfragen oder Disclaimer."""
ENHANCE_PROMPT_ADHOC = """Du bist ein Recherche-Planer in einem OSINT-Lagemonitoring-System.
Deine Aufgabe: Erstelle eine knappe Vorfallsbeschreibung, die als Suchauftrag fuer Live-Monitoring dient.
Du behauptest KEINE Fakten und musst den Vorfall NICHT kennen oder verifizieren.
Der Nutzer gibt das Thema vor -- du strukturierst, wonach gesucht werden soll.
Erstelle die Beschreibung IMMER, auch wenn dir der Vorfall unbekannt ist.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ae, oe, ue, ss) und KEINE Umschreibungen.
Titel: {title}
Vorhandener Kontext: {context}
Typ: Live-Monitoring (aktuelle Ereignisse)
Erstelle eine knappe, informative Beschreibung mit:
1. Was ist passiert / worum geht es (basierend auf Titel und Kontext)
2. Wo (geographischer Kontext, falls ableitbar)
3. Wer ist beteiligt (Akteure, Organisationen, Laender)
4. Wonach soll gesucht werden (aktuelle Entwicklungen, Reaktionen, Hintergruende)
Schreibe NUR die Beschreibung als Fliesstext (3-5 Zeilen). Keine Erklaerungen, Rueckfragen oder Disclaimer."""
_enhance_logger = logging.getLogger("osint.enhance")
@router.post("/enhance-description")
async def enhance_description(
data: DescriptionEnhanceRequest,
current_user: dict = Depends(get_current_user),
):
"""Generiert eine strukturierte Beschreibung per KI aus dem Titel."""
from agents.claude_client import call_claude
from config import CLAUDE_MODEL_FAST
template = ENHANCE_PROMPT_RESEARCH if data.type == "research" else ENHANCE_PROMPT_ADHOC
context = data.description.strip() if data.description and data.description.strip() else "Kein Kontext angegeben"
prompt = template.format(title=data.title.strip(), context=context)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST, raw_text=True)
_enhance_logger.info(
f"Beschreibung generiert fuer \"{data.title[:50]}\": "
f"{usage.input_tokens}in/{usage.output_tokens}out"
)
return {"description": result.strip()}
except Exception as e:
_enhance_logger.error(f"Beschreibung generieren fehlgeschlagen: {e}")
raise HTTPException(status_code=500, detail="Beschreibung konnte nicht generiert werden")
@router.get("/{incident_id}", response_model=IncidentResponse)
async def get_incident(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Einzelne Lage abrufen."""
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
return await _enrich_incident(db, row)
@router.put("/{incident_id}", response_model=IncidentResponse)
async def update_incident(
incident_id: int,
data: IncidentUpdate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage aktualisieren."""
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
if field not in INCIDENT_UPDATE_COLUMNS:
continue
if field in ("international_sources", "include_telegram"):
updates[field] = 1 if value else 0
else:
updates[field] = value
if not updates:
return await _enrich_incident(db, row)
updates["updated_at"] = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [incident_id]
await db.execute(f"UPDATE incidents SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
row = await cursor.fetchone()
return await _enrich_incident(db, row)
@router.delete("/{incident_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_incident(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage loeschen (nur Ersteller)."""
tenant_id = current_user.get("tenant_id")
cursor = await db.execute(
"SELECT id, created_by FROM incidents WHERE id = ? AND tenant_id = ?",
(incident_id, tenant_id),
)
incident = await cursor.fetchone()
if not incident:
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
if incident["created_by"] != current_user["id"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nur der Ersteller kann diese Lage loeschen",
)
try:
await db.execute("DELETE FROM incidents WHERE id = ?", (incident_id,))
await db.commit()
except Exception as e:
if "database is locked" in str(e):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Datenbank ist momentan beschaeftigt. Bitte in wenigen Sekunden erneut versuchen.",
)
raise
@router.get("/{incident_id}/articles")
async def get_articles(
incident_id: int,
limit: int = Query(500, ge=1, le=1000),
offset: int = Query(0, ge=0),
search: str | None = Query(None, min_length=0, max_length=200),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Artikel einer Lage paginiert abrufen.
Response: ``{"total": int, "articles": [...]}``.
Optionaler ``search``-Param filtert per LIKE ueber
headline, headline_de, source, content_de, content_original.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
search_clean = (search or "").strip()
if search_clean:
like = f"%{search_clean}%"
params = (incident_id, like, like, like, like, like)
where = (
"WHERE incident_id = ? AND ("
"COALESCE(headline,'') LIKE ? OR "
"COALESCE(headline_de,'') LIKE ? OR "
"COALESCE(source,'') LIKE ? OR "
"COALESCE(content_de,'') LIKE ? OR "
"COALESCE(content_original,'') LIKE ?)"
)
else:
params = (incident_id,)
where = "WHERE incident_id = ?"
cursor = await db.execute(f"SELECT COUNT(*) AS cnt FROM articles {where}", params)
total = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
f"SELECT * FROM articles {where} ORDER BY collected_at DESC LIMIT ? OFFSET ?",
(*params, limit, offset),
)
rows = await cursor.fetchall()
return {"total": total, "articles": [dict(row) for row in rows]}
@router.get("/{incident_id}/articles/sources-summary")
async def get_articles_sources_summary(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Quellen-Statistik fuer eine Lage (fuer Quellenuebersicht)."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT source,
COUNT(*) AS article_count,
GROUP_CONCAT(DISTINCT COALESCE(language,'de')) AS languages
FROM articles WHERE incident_id = ?
GROUP BY source ORDER BY article_count DESC""",
(incident_id,),
)
sources = []
for r in await cursor.fetchall():
d = dict(r)
langs = (d.pop("languages") or "de").split(",")
d["languages"] = sorted({(l or "de").strip() for l in langs if l is not None})
sources.append(d)
# Sprach-Verteilung gesamt
cursor = await db.execute(
"""SELECT COALESCE(language,'de') AS language, COUNT(*) AS cnt
FROM articles WHERE incident_id = ?
GROUP BY language ORDER BY cnt DESC""",
(incident_id,),
)
lang_counts = [dict(r) for r in await cursor.fetchall()]
total_cursor = await db.execute(
"SELECT COUNT(*) AS cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
total = (await total_cursor.fetchone())["cnt"]
return {"total": total, "sources": sources, "language_counts": lang_counts}
@router.get("/{incident_id}/articles/timeline-buckets")
async def get_articles_timeline_buckets(
incident_id: int,
granularity: str = Query("day", pattern="^(hour|day|week|month)$"),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Zeit-Buckets fuer die Timeline-Achse.
Zaehlt Artikel und Snapshots pro Bucket. Kein Inhalt, nur Counts.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
fmt_map = {
"hour": "%Y-%m-%d %H:00",
"day": "%Y-%m-%d",
"week": "%Y-%W",
"month": "%Y-%m",
}
fmt = fmt_map[granularity]
cursor = await db.execute(
f"""SELECT strftime(?, collected_at) AS bucket, COUNT(*) AS article_count
FROM articles WHERE incident_id = ?
GROUP BY bucket ORDER BY bucket""",
(fmt, incident_id),
)
article_rows = {r["bucket"]: r["article_count"] for r in await cursor.fetchall()}
cursor = await db.execute(
f"""SELECT strftime(?, created_at) AS bucket, COUNT(*) AS snapshot_count
FROM incident_snapshots WHERE incident_id = ?
GROUP BY bucket ORDER BY bucket""",
(fmt, incident_id),
)
snapshot_rows = {r["bucket"]: r["snapshot_count"] for r in await cursor.fetchall()}
all_buckets = sorted(set(article_rows.keys()) | set(snapshot_rows.keys()))
return {
"granularity": granularity,
"buckets": [
{
"bucket": b,
"article_count": article_rows.get(b, 0),
"snapshot_count": snapshot_rows.get(b, 0),
}
for b in all_buckets
],
}
@router.get("/{incident_id}/snapshots")
async def get_snapshots(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lageberichte (Snapshots) einer Lage abrufen — schlanke Liste.
Liefert nur Metadaten und einen 300-Zeichen-Preview des Summary.
Der Volltext (summary + sources_json) wird per Einzel-Endpunkt
``GET /{incident_id}/snapshots/{snapshot_id}`` bei Bedarf geladen.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
SUBSTR(summary, 1, 300) AS summary_preview
FROM incident_snapshots WHERE incident_id = ?
ORDER BY created_at DESC""",
(incident_id,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/snapshots/search")
async def search_snapshots(
incident_id: int,
q: str = Query(..., min_length=2, max_length=200),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Volltextsuche über alle Snapshots einer Lage.
Liefert dieselbe schlanke Shape wie der Listen-Endpunkt,
gefiltert per ``summary LIKE '%q%'``.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
like = f"%{q}%"
cursor = await db.execute(
"""SELECT id, incident_id, article_count, fact_check_count, created_at,
SUBSTR(summary, 1, 300) AS summary_preview
FROM incident_snapshots
WHERE incident_id = ? AND summary LIKE ?
ORDER BY created_at DESC""",
(incident_id, like),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/snapshots/{snapshot_id}")
async def get_snapshot(
incident_id: int,
snapshot_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Einzelnen Snapshot mit vollem Summary + sources_json abrufen (Lazy-Load)."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, incident_id, summary, sources_json,
article_count, fact_check_count, created_at
FROM incident_snapshots WHERE id = ? AND incident_id = ?""",
(snapshot_id, incident_id),
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Snapshot nicht gefunden")
return dict(row)
@router.get("/{incident_id}/factchecks")
async def get_factchecks(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Faktenchecks einer Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
(incident_id,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.get("/{incident_id}/locations")
async def get_locations(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Geografische Orte einer Lage abrufen (serverseitig aggregiert nach Ort).
Drei getrennte Queries (alle klein) statt eines 21k-Zeilen-JOINs:
1. Orte-Aggregate per GROUP BY (name, lat, lon) — liefert direkt ~Ergebnismenge.
2. Kategorien pro Ort per GROUP BY (name, lat, lon, category) — fuer dominante Kategorie.
3. Sample-Artikel pro Ort via ROW_NUMBER() — max. 10 pro Ort.
"""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
# 1. Orte-Aggregate
cursor = await db.execute(
"""SELECT
COALESCE(location_name_normalized, location_name) AS name,
ROUND(latitude, 2) AS lat,
ROUND(longitude, 2) AS lon,
MIN(country_code) AS country_code,
MAX(confidence) AS confidence,
COUNT(*) AS article_count
FROM article_locations
WHERE incident_id = ?
GROUP BY name, lat, lon
ORDER BY article_count DESC""",
(incident_id,),
)
loc_rows = [dict(r) for r in await cursor.fetchall()]
# 2. Kategorien pro Ort
cursor = await db.execute(
"""SELECT
COALESCE(location_name_normalized, location_name) AS name,
ROUND(latitude, 2) AS lat,
ROUND(longitude, 2) AS lon,
COALESCE(category, 'mentioned') AS category,
COUNT(*) AS cnt
FROM article_locations
WHERE incident_id = ?
GROUP BY name, lat, lon, category""",
(incident_id,),
)
cat_map: dict[tuple, dict[str, int]] = {}
for r in await cursor.fetchall():
key = (r["name"], r["lat"], r["lon"])
cat_map.setdefault(key, {})[r["category"]] = r["cnt"]
# 3. Sample-Artikel pro Ort (max. 10, neueste zuerst)
cursor = await db.execute(
"""SELECT name, lat, lon, article_id, headline, headline_de, source, source_url
FROM (
SELECT
COALESCE(al.location_name_normalized, al.location_name) AS name,
ROUND(al.latitude, 2) AS lat,
ROUND(al.longitude, 2) AS lon,
a.id AS article_id,
a.headline, a.headline_de, a.source, a.source_url,
ROW_NUMBER() OVER (
PARTITION BY COALESCE(al.location_name_normalized, al.location_name),
ROUND(al.latitude, 2), ROUND(al.longitude, 2)
ORDER BY a.collected_at DESC
) AS rn
FROM article_locations al
JOIN articles a ON a.id = al.article_id
WHERE al.incident_id = ?
)
WHERE rn <= 10""",
(incident_id,),
)
sample_map: dict[tuple, list[dict]] = {}
for r in await cursor.fetchall():
key = (r["name"], r["lat"], r["lon"])
sample_map.setdefault(key, []).append({
"id": r["article_id"],
"headline": r["headline_de"] or r["headline"],
"source": r["source"],
"source_url": r["source_url"],
})
# Zusammensetzen
priority = {"primary": 4, "secondary": 3, "tertiary": 2, "mentioned": 1}
result = []
for loc in loc_rows:
key = (loc["name"], loc["lat"], loc["lon"])
cats = cat_map.get(key, {})
best_cat = max(cats, key=lambda c: (priority.get(c, 0), cats[c])) if cats else "mentioned"
result.append({
"location_name": loc["name"],
"lat": loc["lat"],
"lon": loc["lon"],
"country_code": loc["country_code"],
"confidence": loc["confidence"],
"article_count": loc["article_count"],
"articles": sample_map.get(key, []),
"category": best_cat,
})
# Category-Labels aus Incident laden
cursor = await db.execute(
"SELECT category_labels FROM incidents WHERE id = ?", (incident_id,)
)
inc_row = await cursor.fetchone()
category_labels = None
if inc_row and inc_row["category_labels"]:
try:
category_labels = json.loads(inc_row["category_labels"])
except (json.JSONDecodeError, TypeError):
pass
return {"category_labels": category_labels, "locations": result}
# Geoparse-Status pro Incident (in-memory)
_geoparse_status: dict[int, dict] = {}
async def _run_geoparse_background(incident_id: int, tenant_id: int | None):
"""Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage."""
_geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0}
db = None
try:
from agents.geoparsing import geoparse_articles
db = await get_db()
# Incident-Kontext fuer Haiku laden
cursor = await db.execute(
"SELECT title, description FROM incidents WHERE id = ?", (incident_id,)
)
inc_row = await cursor.fetchone()
incident_context = ""
if inc_row:
incident_context = f"{inc_row['title']} - {inc_row['description'] or ''}"
cursor = await db.execute(
"""SELECT a.* FROM articles a
WHERE a.incident_id = ?
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
(incident_id, incident_id),
)
articles = [dict(row) for row in await cursor.fetchall()]
if not articles:
_geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0}
return
total = len(articles)
_geoparse_status[incident_id]["total"] = total
_geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}")
# In Batches verarbeiten (50 Artikel pro Batch)
batch_size = 50
geo_count = 0
processed = 0
for i in range(0, total, batch_size):
batch = articles[i:i + batch_size]
geo_result = await geoparse_articles(batch, incident_context)
# Tuple-Rückgabe: (locations_dict, category_labels)
if isinstance(geo_result, tuple):
batch_geo_results, batch_labels = geo_result
# Labels beim ersten Batch speichern
if batch_labels and i == 0:
try:
await db.execute(
"UPDATE incidents SET category_labels = ? WHERE id = ? AND category_labels IS NULL",
(json.dumps(batch_labels, ensure_ascii=False), incident_id),
)
await db.commit()
except Exception:
pass
else:
batch_geo_results = geo_result
for art_id, locations in batch_geo_results.items():
for loc in locations:
await db.execute(
"""INSERT INTO article_locations
(article_id, incident_id, location_name, location_name_normalized,
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
)
geo_count += 1
await db.commit()
processed += len(batch)
_geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count}
_geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count}
_geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})")
except Exception as e:
_geoparse_status[incident_id] = {"status": "error", "error": str(e)}
_geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}")
finally:
if db:
await db.close()
@router.post("/{incident_id}/geoparse")
async def trigger_geoparse(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
# Bereits laufend?
existing = _geoparse_status.get(incident_id, {})
if existing.get("status") == "running":
return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"}
# Pruefen ob es ueberhaupt ungeparsete Artikel gibt
cursor = await db.execute(
"""SELECT COUNT(*) as cnt FROM articles a
WHERE a.incident_id = ?
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
(incident_id, incident_id),
)
count = (await cursor.fetchone())["cnt"]
if count == 0:
return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0}
# Hintergrund-Task starten
asyncio.create_task(_run_geoparse_background(incident_id, tenant_id))
return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"}
@router.get("/{incident_id}/geoparse-status")
async def get_geoparse_status(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Status des laufenden Geoparsing-Tasks abfragen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
return _geoparse_status.get(incident_id, {"status": "idle"})
@router.get("/{incident_id}/refresh-log")
async def get_refresh_log(
incident_id: int,
limit: int = 20,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Refresh-Verlauf einer Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT id, started_at, completed_at, articles_found, status,
trigger_type, retry_count, error_message
FROM refresh_log WHERE incident_id = ?
ORDER BY started_at DESC LIMIT ?""",
(incident_id, min(limit, 100)),
)
rows = await cursor.fetchall()
results = []
for row in rows:
entry = dict(row)
if entry["started_at"] and entry["completed_at"]:
try:
start = datetime.fromisoformat(entry["started_at"])
end = datetime.fromisoformat(entry["completed_at"])
entry["duration_seconds"] = round((end - start).total_seconds(), 1)
except Exception:
entry["duration_seconds"] = None
else:
entry["duration_seconds"] = None
results.append(entry)
return results
@router.get("/{incident_id}/subscription", response_model=SubscriptionResponse)
async def get_subscription(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage abrufen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
cursor = await db.execute(
"""SELECT notify_email_summary, notify_email_new_articles, notify_email_status_change
FROM incident_subscriptions WHERE user_id = ? AND incident_id = ?""",
(current_user["id"], incident_id),
)
row = await cursor.fetchone()
if row:
return dict(row)
return {"notify_email_summary": False, "notify_email_new_articles": False, "notify_email_status_change": False}
@router.put("/{incident_id}/subscription", response_model=SubscriptionResponse)
async def update_subscription(
incident_id: int,
data: SubscriptionUpdate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage setzen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
await db.execute(
"""INSERT INTO incident_subscriptions (user_id, incident_id, notify_email_summary, notify_email_new_articles, notify_email_status_change)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id, incident_id) DO UPDATE SET
notify_email_summary = excluded.notify_email_summary,
notify_email_new_articles = excluded.notify_email_new_articles,
notify_email_status_change = excluded.notify_email_status_change""",
(
current_user["id"],
incident_id,
1 if data.notify_email_summary else 0,
1 if data.notify_email_new_articles else 0,
1 if data.notify_email_status_change else 0,
),
)
await db.commit()
return {
"notify_email_summary": data.notify_email_summary,
"notify_email_new_articles": data.notify_email_new_articles,
"notify_email_status_change": data.notify_email_status_change,
}
@router.post("/{incident_id}/refresh")
async def trigger_refresh(
incident_id: int,
current_user: dict = Depends(require_writable_license),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Manuellen Refresh fuer eine Lage ausloesen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
from agents.orchestrator import orchestrator
enqueued = await orchestrator.enqueue_refresh(incident_id, user_id=current_user["id"])
if not enqueued:
return {"status": "skipped", "incident_id": incident_id}
return {"status": "queued", "incident_id": incident_id}
@router.post("/{incident_id}/cancel-refresh")
async def cancel_refresh(
incident_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Laufenden Refresh fuer eine Lage abbrechen."""
tenant_id = current_user.get("tenant_id")
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
from agents.orchestrator import orchestrator
cancelled = await orchestrator.cancel_refresh(incident_id)
return {"status": "cancelling" if cancelled else "not_running"}
def _slugify(text: str) -> str:
"""Dateinamen-sicherer Slug aus Titel."""
replacements = {
"\u00e4": "ae", "\u00f6": "oe", "\u00fc": "ue", "\u00df": "ss",
"\u00c4": "Ae", "\u00d6": "Oe", "\u00dc": "Ue",
}
for src, dst in replacements.items():
text = text.replace(src, dst)
text = unicodedata.normalize("NFKD", text)
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text).strip("-")
return text[:80].lower()
@router.get("/{incident_id}/export")
async def export_incident(
incident_id: int,
format: str = Query("pdf", pattern="^(pdf|docx)$"),
scope: str = Query("report", pattern="^(summary|report|full)$"),
sections: str = Query(None),
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Lage als PDF oder Word exportieren."""
from report_generator import generate_pdf, generate_docx, generate_executive_summary
# Sections aus Komma-getrenntem String parsen
VALID_SECTIONS = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline", "karte"}
sections_set = None
if sections:
sections_set = {s.strip() for s in sections.split(",") if s.strip() in VALID_SECTIONS}
if not sections_set:
sections_set = None
tenant_id = current_user.get("tenant_id")
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
incident = dict(row)
# Ersteller-Name
cursor = await db.execute("SELECT email FROM users WHERE id = ?", (incident["created_by"],))
user_row = await cursor.fetchone()
creator = user_row["email"] if user_row else "Unbekannt"
# Artikel
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
articles = [dict(r) for r in await cursor.fetchall()]
# Faktenchecks
cursor = await db.execute(
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
(incident_id,),
)
fact_checks = [dict(r) for r in await cursor.fetchall()]
# Snapshots (nur bei full)
snapshots = []
if scope == "full":
cursor = await db.execute(
"SELECT * FROM incident_snapshots WHERE incident_id = ? ORDER BY created_at DESC",
(incident_id,),
)
snapshots = [dict(r) for r in await cursor.fetchall()]
# Executive Summary (KI-generiert, gecacht)
exec_summary = incident.get("executive_summary")
if not exec_summary:
summary_text = incident.get("summary") or ""
exec_summary = await generate_executive_summary(summary_text)
await db.execute(
"UPDATE incidents SET executive_summary = ? WHERE id = ?",
(exec_summary, incident_id),
)
await db.commit()
date_str = datetime.now(TIMEZONE).strftime("%Y%m%d")
slug = _slugify(incident["title"])
scope_labels = {"summary": "zusammenfassung", "report": "lagebericht", "full": "vollstaendig"}
# Wenn sections explizit angegeben, passenden Label waehlen
if sections_set:
if sections_set == {"zusammenfassung"}:
scope_labels_key = "zusammenfassung"
elif "timeline" in sections_set:
scope_labels_key = "vollstaendig"
else:
scope_labels_key = "lagebericht"
else:
scope_labels_key = scope_labels.get(scope, "lagebericht")
if format == "pdf":
pdf_bytes = await generate_pdf(incident, articles, fact_checks, snapshots, scope, creator, exec_summary, sections=sections_set)
filename = f"{slug}_{scope_labels_key}_{date_str}.pdf"
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
else:
docx_bytes = await generate_docx(incident, articles, fact_checks, snapshots, scope, creator, exec_summary, sections=sections_set)
filename = f"{slug}_{scope_labels_key}_{date_str}.docx"
return StreamingResponse(
io.BytesIO(docx_bytes),
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)