- geoparsing.py: Komplett-Rewrite (spaCy NER + Nominatim -> Haiku + geonamescache) - orchestrator.py: incident_context an geoparse_articles, category in INSERT - incidents.py: incident_context aus DB laden und an Geoparsing uebergeben - public_api.py: Locations aggregiert im Lagebild-Endpoint - components.js: response-Kategorie neben retaliation (beide akzeptiert) - requirements.txt: spaCy und geopy entfernt Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
818 Zeilen
31 KiB
Python
818 Zeilen
31 KiB
Python
"""Incidents-Router: Lagen verwalten (Multi-Tenant)."""
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
|
|
from fastapi.responses import StreamingResponse
|
|
from models import IncidentCreate, IncidentUpdate, IncidentResponse, SubscriptionUpdate, SubscriptionResponse
|
|
from auth import get_current_user
|
|
from middleware.license_check import require_writable_license
|
|
from database import db_dependency, get_db
|
|
from datetime import datetime
|
|
from config import TIMEZONE
|
|
import asyncio
|
|
import aiosqlite
|
|
import json
|
|
import logging
|
|
import re
|
|
import unicodedata
|
|
|
|
_geoparse_logger = logging.getLogger("osint.geoparse_bg")
|
|
|
|
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
|
|
|
|
INCIDENT_UPDATE_COLUMNS = {
|
|
"title", "description", "type", "status", "refresh_mode",
|
|
"refresh_interval", "retention_days", "international_sources", "visibility",
|
|
}
|
|
|
|
|
|
async def _check_incident_access(
|
|
db: aiosqlite.Connection, incident_id: int, user_id: int, tenant_id: int
|
|
) -> aiosqlite.Row:
|
|
"""Lage laden und Zugriff pruefen (Tenant + Sichtbarkeit)."""
|
|
cursor = await db.execute(
|
|
"SELECT * FROM incidents WHERE id = ? AND tenant_id = ?",
|
|
(incident_id, tenant_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
|
|
if row["visibility"] == "private" and row["created_by"] != user_id:
|
|
raise HTTPException(status_code=403, detail="Kein Zugriff auf private Lage")
|
|
return row
|
|
|
|
|
|
async def _enrich_incident(db: aiosqlite.Connection, row: aiosqlite.Row) -> dict:
|
|
"""Incident-Row mit Statistiken und Ersteller-Name anreichern."""
|
|
incident = dict(row)
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident["id"],),
|
|
)
|
|
article_count = (await cursor.fetchone())["cnt"]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id = ?",
|
|
(incident["id"],),
|
|
)
|
|
source_count = (await cursor.fetchone())["cnt"]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT email FROM users WHERE id = ?",
|
|
(incident["created_by"],),
|
|
)
|
|
user_row = await cursor.fetchone()
|
|
|
|
incident["article_count"] = article_count
|
|
incident["source_count"] = source_count
|
|
incident["created_by_username"] = user_row["email"] if user_row else "Unbekannt"
|
|
return incident
|
|
|
|
|
|
@router.get("", response_model=list[IncidentResponse])
|
|
async def list_incidents(
|
|
status_filter: str = None,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle Lagen des Tenants auflisten (oeffentliche + eigene private)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
user_id = current_user["id"]
|
|
|
|
query = "SELECT * FROM incidents WHERE tenant_id = ? AND (visibility = 'public' OR created_by = ?)"
|
|
params = [tenant_id, user_id]
|
|
|
|
if status_filter:
|
|
query += " AND status = ?"
|
|
params.append(status_filter)
|
|
|
|
query += " ORDER BY updated_at DESC"
|
|
cursor = await db.execute(query, params)
|
|
rows = await cursor.fetchall()
|
|
|
|
results = []
|
|
for row in rows:
|
|
results.append(await _enrich_incident(db, row))
|
|
return results
|
|
|
|
|
|
@router.post("", response_model=IncidentResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_incident(
|
|
data: IncidentCreate,
|
|
current_user: dict = Depends(require_writable_license),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Neue Lage anlegen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor = await db.execute(
|
|
"""INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval,
|
|
retention_days, international_sources, visibility,
|
|
tenant_id, created_by, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
data.title,
|
|
data.description,
|
|
data.type,
|
|
data.refresh_mode,
|
|
data.refresh_interval,
|
|
data.retention_days,
|
|
1 if data.international_sources else 0,
|
|
data.visibility,
|
|
tenant_id,
|
|
current_user["id"],
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (cursor.lastrowid,))
|
|
row = await cursor.fetchone()
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.get("/refreshing")
|
|
async def get_refreshing_incidents(
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Gibt IDs aller Lagen mit laufendem Refresh zurueck (nur eigener Tenant)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
cursor = await db.execute(
|
|
"""SELECT rl.incident_id, rl.started_at FROM refresh_log rl
|
|
JOIN incidents i ON i.id = rl.incident_id
|
|
WHERE rl.status = 'running'
|
|
AND i.tenant_id = ?
|
|
AND (i.visibility = 'public' OR i.created_by = ?)""",
|
|
(tenant_id, current_user["id"]),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return {
|
|
"refreshing": [row["incident_id"] for row in rows],
|
|
"details": {str(row["incident_id"]): {"started_at": row["started_at"]} for row in rows},
|
|
}
|
|
|
|
|
|
@router.get("/{incident_id}", response_model=IncidentResponse)
|
|
async def get_incident(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Einzelne Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.put("/{incident_id}", response_model=IncidentResponse)
|
|
async def update_incident(
|
|
incident_id: int,
|
|
data: IncidentUpdate,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage aktualisieren."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
updates = {}
|
|
for field, value in data.model_dump(exclude_none=True).items():
|
|
if field not in INCIDENT_UPDATE_COLUMNS:
|
|
continue
|
|
updates[field] = value
|
|
|
|
if not updates:
|
|
return await _enrich_incident(db, row)
|
|
|
|
updates["updated_at"] = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [incident_id]
|
|
|
|
await db.execute(f"UPDATE incidents SET {set_clause} WHERE id = ?", values)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
|
|
row = await cursor.fetchone()
|
|
return await _enrich_incident(db, row)
|
|
|
|
|
|
@router.delete("/{incident_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_incident(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage loeschen (nur Ersteller)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
cursor = await db.execute(
|
|
"SELECT id, created_by FROM incidents WHERE id = ? AND tenant_id = ?",
|
|
(incident_id, tenant_id),
|
|
)
|
|
incident = await cursor.fetchone()
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Lage nicht gefunden")
|
|
|
|
if incident["created_by"] != current_user["id"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Nur der Ersteller kann diese Lage loeschen",
|
|
)
|
|
|
|
await db.execute("DELETE FROM incidents WHERE id = ?", (incident_id,))
|
|
await db.commit()
|
|
|
|
|
|
@router.get("/{incident_id}/articles")
|
|
async def get_articles(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle Artikel einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/snapshots")
|
|
async def get_snapshots(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lageberichte (Snapshots) einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT id, incident_id, summary, sources_json,
|
|
article_count, fact_check_count, created_at
|
|
FROM incident_snapshots WHERE incident_id = ?
|
|
ORDER BY created_at DESC""",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/factchecks")
|
|
async def get_factchecks(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Faktenchecks einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
@router.get("/{incident_id}/locations")
|
|
async def get_locations(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Geografische Orte einer Lage abrufen (aggregiert nach Ort)."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT al.location_name, al.location_name_normalized, al.country_code,
|
|
al.latitude, al.longitude, al.confidence, al.category,
|
|
a.id as article_id, a.headline, a.headline_de, a.source, a.source_url
|
|
FROM article_locations al
|
|
JOIN articles a ON a.id = al.article_id
|
|
WHERE al.incident_id = ?
|
|
ORDER BY al.location_name_normalized, a.collected_at DESC""",
|
|
(incident_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
|
|
# Aggregierung nach normalisiertem Ortsnamen + Koordinaten
|
|
loc_map = {}
|
|
for row in rows:
|
|
row = dict(row)
|
|
key = (row["location_name_normalized"] or row["location_name"], round(row["latitude"], 2), round(row["longitude"], 2))
|
|
if key not in loc_map:
|
|
loc_map[key] = {
|
|
"location_name": row["location_name_normalized"] or row["location_name"],
|
|
"lat": row["latitude"],
|
|
"lon": row["longitude"],
|
|
"country_code": row["country_code"],
|
|
"confidence": row["confidence"],
|
|
"article_count": 0,
|
|
"articles": [],
|
|
"categories": {},
|
|
}
|
|
loc_map[key]["article_count"] += 1
|
|
cat = row["category"] or "mentioned"
|
|
loc_map[key]["categories"][cat] = loc_map[key]["categories"].get(cat, 0) + 1
|
|
# Maximal 10 Artikel pro Ort mitliefern
|
|
if len(loc_map[key]["articles"]) < 10:
|
|
loc_map[key]["articles"].append({
|
|
"id": row["article_id"],
|
|
"headline": row["headline_de"] or row["headline"],
|
|
"source": row["source"],
|
|
"source_url": row["source_url"],
|
|
})
|
|
|
|
# Dominanteste Kategorie pro Ort bestimmen (Prioritaet: target > retaliation > actor > mentioned)
|
|
priority = {"target": 4, "retaliation": 3, "actor": 2, "mentioned": 1}
|
|
result = []
|
|
for loc in loc_map.values():
|
|
cats = loc.pop("categories")
|
|
if cats:
|
|
best_cat = max(cats, key=lambda c: (priority.get(c, 0), cats[c]))
|
|
else:
|
|
best_cat = "mentioned"
|
|
loc["category"] = best_cat
|
|
result.append(loc)
|
|
return result
|
|
|
|
|
|
# Geoparse-Status pro Incident (in-memory)
|
|
_geoparse_status: dict[int, dict] = {}
|
|
|
|
|
|
async def _run_geoparse_background(incident_id: int, tenant_id: int | None):
|
|
"""Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage."""
|
|
_geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0}
|
|
db = None
|
|
try:
|
|
from agents.geoparsing import geoparse_articles
|
|
db = await get_db()
|
|
|
|
# Incident-Kontext fuer Haiku laden
|
|
cursor = await db.execute(
|
|
"SELECT title, description FROM incidents WHERE id = ?", (incident_id,)
|
|
)
|
|
inc_row = await cursor.fetchone()
|
|
incident_context = ""
|
|
if inc_row:
|
|
incident_context = f"{inc_row['title']} - {inc_row['description'] or ''}"
|
|
|
|
cursor = await db.execute(
|
|
"""SELECT a.* FROM articles a
|
|
WHERE a.incident_id = ?
|
|
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
|
(incident_id, incident_id),
|
|
)
|
|
articles = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
if not articles:
|
|
_geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0}
|
|
return
|
|
|
|
total = len(articles)
|
|
_geoparse_status[incident_id]["total"] = total
|
|
_geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}")
|
|
|
|
# In Batches verarbeiten (50 Artikel pro Batch)
|
|
batch_size = 50
|
|
geo_count = 0
|
|
processed = 0
|
|
for i in range(0, total, batch_size):
|
|
batch = articles[i:i + batch_size]
|
|
geo_results = await geoparse_articles(batch, incident_context)
|
|
for art_id, locations in geo_results.items():
|
|
for loc in locations:
|
|
await db.execute(
|
|
"""INSERT INTO article_locations
|
|
(article_id, incident_id, location_name, location_name_normalized,
|
|
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
|
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
|
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
|
|
)
|
|
geo_count += 1
|
|
await db.commit()
|
|
processed += len(batch)
|
|
_geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count}
|
|
|
|
_geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count}
|
|
_geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})")
|
|
except Exception as e:
|
|
_geoparse_status[incident_id] = {"status": "error", "error": str(e)}
|
|
_geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}")
|
|
finally:
|
|
if db:
|
|
await db.close()
|
|
|
|
|
|
@router.post("/{incident_id}/geoparse")
|
|
async def trigger_geoparse(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
# Bereits laufend?
|
|
existing = _geoparse_status.get(incident_id, {})
|
|
if existing.get("status") == "running":
|
|
return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"}
|
|
|
|
# Pruefen ob es ueberhaupt ungeparsete Artikel gibt
|
|
cursor = await db.execute(
|
|
"""SELECT COUNT(*) as cnt FROM articles a
|
|
WHERE a.incident_id = ?
|
|
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
|
(incident_id, incident_id),
|
|
)
|
|
count = (await cursor.fetchone())["cnt"]
|
|
if count == 0:
|
|
return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0}
|
|
|
|
# Hintergrund-Task starten
|
|
asyncio.create_task(_run_geoparse_background(incident_id, tenant_id))
|
|
return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"}
|
|
|
|
|
|
@router.get("/{incident_id}/geoparse-status")
|
|
async def get_geoparse_status(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Status des laufenden Geoparsing-Tasks abfragen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
return _geoparse_status.get(incident_id, {"status": "idle"})
|
|
|
|
|
|
@router.get("/{incident_id}/refresh-log")
|
|
async def get_refresh_log(
|
|
incident_id: int,
|
|
limit: int = 20,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Refresh-Verlauf einer Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT id, started_at, completed_at, articles_found, status,
|
|
trigger_type, retry_count, error_message
|
|
FROM refresh_log WHERE incident_id = ?
|
|
ORDER BY started_at DESC LIMIT ?""",
|
|
(incident_id, min(limit, 100)),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
entry = dict(row)
|
|
if entry["started_at"] and entry["completed_at"]:
|
|
try:
|
|
start = datetime.fromisoformat(entry["started_at"])
|
|
end = datetime.fromisoformat(entry["completed_at"])
|
|
entry["duration_seconds"] = round((end - start).total_seconds(), 1)
|
|
except Exception:
|
|
entry["duration_seconds"] = None
|
|
else:
|
|
entry["duration_seconds"] = None
|
|
results.append(entry)
|
|
return results
|
|
|
|
|
|
@router.get("/{incident_id}/subscription", response_model=SubscriptionResponse)
|
|
async def get_subscription(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage abrufen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
cursor = await db.execute(
|
|
"""SELECT notify_email_summary, notify_email_new_articles, notify_email_status_change
|
|
FROM incident_subscriptions WHERE user_id = ? AND incident_id = ?""",
|
|
(current_user["id"], incident_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
return dict(row)
|
|
return {"notify_email_summary": False, "notify_email_new_articles": False, "notify_email_status_change": False}
|
|
|
|
|
|
@router.put("/{incident_id}/subscription", response_model=SubscriptionResponse)
|
|
async def update_subscription(
|
|
incident_id: int,
|
|
data: SubscriptionUpdate,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""E-Mail-Abo-Einstellungen des aktuellen Nutzers fuer eine Lage setzen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
await db.execute(
|
|
"""INSERT INTO incident_subscriptions (user_id, incident_id, notify_email_summary, notify_email_new_articles, notify_email_status_change)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(user_id, incident_id) DO UPDATE SET
|
|
notify_email_summary = excluded.notify_email_summary,
|
|
notify_email_new_articles = excluded.notify_email_new_articles,
|
|
notify_email_status_change = excluded.notify_email_status_change""",
|
|
(
|
|
current_user["id"],
|
|
incident_id,
|
|
1 if data.notify_email_summary else 0,
|
|
1 if data.notify_email_new_articles else 0,
|
|
1 if data.notify_email_status_change else 0,
|
|
),
|
|
)
|
|
await db.commit()
|
|
return {
|
|
"notify_email_summary": data.notify_email_summary,
|
|
"notify_email_new_articles": data.notify_email_new_articles,
|
|
"notify_email_status_change": data.notify_email_status_change,
|
|
}
|
|
|
|
|
|
@router.post("/{incident_id}/refresh")
|
|
async def trigger_refresh(
|
|
incident_id: int,
|
|
current_user: dict = Depends(require_writable_license),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Manuellen Refresh fuer eine Lage ausloesen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
from agents.orchestrator import orchestrator
|
|
enqueued = await orchestrator.enqueue_refresh(incident_id)
|
|
|
|
if not enqueued:
|
|
return {"status": "skipped", "incident_id": incident_id}
|
|
return {"status": "queued", "incident_id": incident_id}
|
|
|
|
|
|
@router.post("/{incident_id}/cancel-refresh")
|
|
async def cancel_refresh(
|
|
incident_id: int,
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Laufenden Refresh fuer eine Lage abbrechen."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
|
|
from agents.orchestrator import orchestrator
|
|
cancelled = await orchestrator.cancel_refresh(incident_id)
|
|
|
|
return {"status": "cancelling" if cancelled else "not_running"}
|
|
|
|
|
|
|
|
def _slugify(text: str) -> str:
|
|
"""Dateinamen-sicherer Slug aus Titel."""
|
|
replacements = {
|
|
"\u00e4": "ae", "\u00f6": "oe", "\u00fc": "ue", "\u00df": "ss",
|
|
"\u00c4": "Ae", "\u00d6": "Oe", "\u00dc": "Ue",
|
|
}
|
|
for src, dst in replacements.items():
|
|
text = text.replace(src, dst)
|
|
text = unicodedata.normalize("NFKD", text)
|
|
text = re.sub(r"[^\w\s-]", "", text)
|
|
text = re.sub(r"[\s_]+", "-", text).strip("-")
|
|
return text[:80].lower()
|
|
|
|
|
|
def _build_markdown_export(
|
|
incident: dict, articles: list, fact_checks: list,
|
|
snapshots: list, scope: str, creator: str
|
|
) -> str:
|
|
"""Markdown-Dokument zusammenbauen."""
|
|
typ = "Hintergrundrecherche" if incident.get("type") == "research" else "Breaking News"
|
|
updated = (incident.get("updated_at") or "")[:16].replace("T", " ")
|
|
|
|
lines = []
|
|
lines.append(f"# {incident['title']}")
|
|
lines.append(f"> {typ} | Erstellt von {creator} | Stand: {updated}")
|
|
lines.append("")
|
|
|
|
# Lagebild
|
|
summary = incident.get("summary") or "*Noch kein Lagebild verf\u00fcgbar.*"
|
|
lines.append("## Lagebild")
|
|
lines.append("")
|
|
lines.append(summary)
|
|
lines.append("")
|
|
|
|
# Quellenverzeichnis aus sources_json
|
|
sources_json = incident.get("sources_json")
|
|
if sources_json:
|
|
try:
|
|
sources = json.loads(sources_json) if isinstance(sources_json, str) else sources_json
|
|
if sources:
|
|
lines.append("## Quellenverzeichnis")
|
|
lines.append("")
|
|
for i, src in enumerate(sources, 1):
|
|
name = src.get("name") or src.get("title") or src.get("url", "")
|
|
url = src.get("url", "")
|
|
if url:
|
|
lines.append(f"{i}. [{name}]({url})")
|
|
else:
|
|
lines.append(f"{i}. {name}")
|
|
lines.append("")
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# Faktencheck
|
|
if fact_checks:
|
|
lines.append("## Faktencheck")
|
|
lines.append("")
|
|
for fc in fact_checks:
|
|
claim = fc.get("claim", "")
|
|
fc_status = fc.get("status", "")
|
|
sources_count = fc.get("sources_count", 0)
|
|
evidence = fc.get("evidence", "")
|
|
status_label = {
|
|
"confirmed": "Best\u00e4tigt", "unconfirmed": "Unbest\u00e4tigt",
|
|
"disputed": "Umstritten", "false": "Falsch",
|
|
}.get(fc_status, fc_status)
|
|
line = f"- **{claim}** \u2014 {status_label} ({sources_count} Quellen)"
|
|
if evidence:
|
|
line += f"\n {evidence}"
|
|
lines.append(line)
|
|
lines.append("")
|
|
|
|
# Scope=full: Artikel\u00fcbersicht
|
|
if scope == "full" and articles:
|
|
lines.append("## Artikel\u00fcbersicht")
|
|
lines.append("")
|
|
lines.append("| Headline | Quelle | Sprache | Datum |")
|
|
lines.append("|----------|--------|---------|-------|")
|
|
for art in articles:
|
|
headline = (art.get("headline_de") or art.get("headline") or "").replace("|", "/")
|
|
source = (art.get("source") or "").replace("|", "/")
|
|
lang = art.get("language", "")
|
|
pub = (art.get("published_at") or art.get("collected_at") or "")[:16]
|
|
lines.append(f"| {headline} | {source} | {lang} | {pub} |")
|
|
lines.append("")
|
|
|
|
# Scope=full: Snapshot-Verlauf
|
|
if scope == "full" and snapshots:
|
|
lines.append("## Snapshot-Verlauf")
|
|
lines.append("")
|
|
for snap in snapshots:
|
|
snap_date = (snap.get("created_at") or "")[:16].replace("T", " ")
|
|
art_count = snap.get("article_count", 0)
|
|
fc_count = snap.get("fact_check_count", 0)
|
|
lines.append(f"### Snapshot vom {snap_date}")
|
|
lines.append(f"Artikel: {art_count} | Faktenchecks: {fc_count}")
|
|
lines.append("")
|
|
snap_summary = snap.get("summary", "")
|
|
if snap_summary:
|
|
lines.append(snap_summary)
|
|
lines.append("")
|
|
|
|
now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M Uhr")
|
|
lines.append("---")
|
|
lines.append(f"*Exportiert am {now} aus AegisSight Monitor*")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_json_export(
|
|
incident: dict, articles: list, fact_checks: list,
|
|
snapshots: list, scope: str, creator: str
|
|
) -> dict:
|
|
"""Strukturiertes JSON fuer Export."""
|
|
now = datetime.now(TIMEZONE).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
sources = []
|
|
sources_json = incident.get("sources_json")
|
|
if sources_json:
|
|
try:
|
|
sources = json.loads(sources_json) if isinstance(sources_json, str) else sources_json
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
export = {
|
|
"export_version": "1.0",
|
|
"exported_at": now,
|
|
"scope": scope,
|
|
"incident": {
|
|
"id": incident["id"],
|
|
"title": incident["title"],
|
|
"description": incident.get("description"),
|
|
"type": incident.get("type"),
|
|
"status": incident.get("status"),
|
|
"visibility": incident.get("visibility"),
|
|
"created_by": creator,
|
|
"created_at": incident.get("created_at"),
|
|
"updated_at": incident.get("updated_at"),
|
|
"summary": incident.get("summary"),
|
|
"international_sources": bool(incident.get("international_sources")),
|
|
},
|
|
"sources": sources,
|
|
"fact_checks": [
|
|
{
|
|
"claim": fc.get("claim"),
|
|
"status": fc.get("status"),
|
|
"sources_count": fc.get("sources_count"),
|
|
"evidence": fc.get("evidence"),
|
|
"checked_at": fc.get("checked_at"),
|
|
}
|
|
for fc in fact_checks
|
|
],
|
|
}
|
|
|
|
if scope == "full":
|
|
export["articles"] = [
|
|
{
|
|
"headline": art.get("headline"),
|
|
"headline_de": art.get("headline_de"),
|
|
"source": art.get("source"),
|
|
"source_url": art.get("source_url"),
|
|
"language": art.get("language"),
|
|
"published_at": art.get("published_at"),
|
|
"collected_at": art.get("collected_at"),
|
|
"verification_status": art.get("verification_status"),
|
|
}
|
|
for art in articles
|
|
]
|
|
export["snapshots"] = [
|
|
{
|
|
"created_at": snap.get("created_at"),
|
|
"article_count": snap.get("article_count"),
|
|
"fact_check_count": snap.get("fact_check_count"),
|
|
"summary": snap.get("summary"),
|
|
}
|
|
for snap in snapshots
|
|
]
|
|
|
|
return export
|
|
|
|
|
|
@router.get("/{incident_id}/export")
|
|
async def export_incident(
|
|
incident_id: int,
|
|
format: str = Query(..., pattern="^(md|json)$"),
|
|
scope: str = Query("report", pattern="^(report|full)$"),
|
|
current_user: dict = Depends(get_current_user),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Lage als Markdown oder JSON exportieren."""
|
|
tenant_id = current_user.get("tenant_id")
|
|
row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
|
incident = dict(row)
|
|
|
|
# Ersteller-Name
|
|
cursor = await db.execute("SELECT email FROM users WHERE id = ?", (incident["created_by"],))
|
|
user_row = await cursor.fetchone()
|
|
creator = user_row["email"] if user_row else "Unbekannt"
|
|
|
|
# Artikel
|
|
cursor = await db.execute(
|
|
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
|
|
(incident_id,),
|
|
)
|
|
articles = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Faktenchecks
|
|
cursor = await db.execute(
|
|
"SELECT * FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
|
|
(incident_id,),
|
|
)
|
|
fact_checks = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Snapshots (nur bei full)
|
|
snapshots = []
|
|
if scope == "full":
|
|
cursor = await db.execute(
|
|
"SELECT * FROM incident_snapshots WHERE incident_id = ? ORDER BY created_at DESC",
|
|
(incident_id,),
|
|
)
|
|
snapshots = [dict(r) for r in await cursor.fetchall()]
|
|
|
|
# Dateiname
|
|
date_str = datetime.now(TIMEZONE).strftime("%Y%m%d")
|
|
slug = _slugify(incident["title"])
|
|
scope_suffix = "_vollexport" if scope == "full" else ""
|
|
|
|
if format == "md":
|
|
body = _build_markdown_export(incident, articles, fact_checks, snapshots, scope, creator)
|
|
filename = f"{slug}{scope_suffix}_{date_str}.md"
|
|
media_type = "text/markdown; charset=utf-8"
|
|
else:
|
|
export_data = _build_json_export(incident, articles, fact_checks, snapshots, scope, creator)
|
|
body = json.dumps(export_data, ensure_ascii=False, indent=2)
|
|
filename = f"{slug}{scope_suffix}_{date_str}.json"
|
|
media_type = "application/json; charset=utf-8"
|
|
|
|
return StreamingResponse(
|
|
iter([body]),
|
|
media_type=media_type,
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|