diff --git a/src/routers/incidents.py b/src/routers/incidents.py index 52770e5..ee8dd7a 100644 --- a/src/routers/incidents.py +++ b/src/routers/incidents.py @@ -1,16 +1,20 @@ """Incidents-Router: Lagen verwalten (Multi-Tenant).""" -from fastapi import APIRouter, Depends, HTTPException, Query, status +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status from fastapi.responses import StreamingResponse from models import IncidentCreate, IncidentUpdate, IncidentResponse, SubscriptionUpdate, SubscriptionResponse from auth import get_current_user from middleware.license_check import require_writable_license -from database import db_dependency +from database import db_dependency, get_db from datetime import datetime, timezone +import asyncio import aiosqlite import json +import logging import re import unicodedata +_geoparse_logger = logging.getLogger("osint.geoparse_bg") + router = APIRouter(prefix="/api/incidents", tags=["incidents"]) INCIDENT_UPDATE_COLUMNS = { @@ -320,48 +324,108 @@ async def get_locations( return list(loc_map.values()) +# Geoparse-Status pro Incident (in-memory) +_geoparse_status: dict[int, dict] = {} + + +async def _run_geoparse_background(incident_id: int, tenant_id: int | None): + """Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage.""" + _geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0} + db = None + try: + from agents.geoparsing import geoparse_articles + db = await get_db() + + cursor = await db.execute( + """SELECT a.* FROM articles a + WHERE a.incident_id = ? + AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""", + (incident_id, incident_id), + ) + articles = [dict(row) for row in await cursor.fetchall()] + + if not articles: + _geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0} + return + + total = len(articles) + _geoparse_status[incident_id]["total"] = total + _geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}") + + # In Batches verarbeiten (50 Artikel pro Batch) + batch_size = 50 + geo_count = 0 + processed = 0 + for i in range(0, total, batch_size): + batch = articles[i:i + batch_size] + geo_results = await geoparse_articles(batch) + for art_id, locations in geo_results.items(): + for loc in locations: + await db.execute( + """INSERT INTO article_locations + (article_id, incident_id, location_name, location_name_normalized, + country_code, latitude, longitude, confidence, source_text, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (art_id, incident_id, loc["location_name"], loc["location_name_normalized"], + loc["country_code"], loc["lat"], loc["lon"], loc["confidence"], + loc.get("source_text", ""), tenant_id), + ) + geo_count += 1 + await db.commit() + processed += len(batch) + _geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count} + + _geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count} + _geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})") + except Exception as e: + _geoparse_status[incident_id] = {"status": "error", "error": str(e)} + _geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}") + finally: + if db: + await db.close() + + @router.post("/{incident_id}/geoparse") async def trigger_geoparse( incident_id: int, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): - """Geoparsing fuer alle Artikel einer Lage nachholen (bestehende Orte werden uebersprungen).""" + """Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten.""" tenant_id = current_user.get("tenant_id") await _check_incident_access(db, incident_id, current_user["id"], tenant_id) - # Artikel laden, die noch keine Locations haben + # Bereits laufend? + existing = _geoparse_status.get(incident_id, {}) + if existing.get("status") == "running": + return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"} + + # Pruefen ob es ueberhaupt ungeparsete Artikel gibt cursor = await db.execute( - """SELECT a.* FROM articles a + """SELECT COUNT(*) as cnt FROM articles a WHERE a.incident_id = ? AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""", (incident_id, incident_id), ) - articles = [dict(row) for row in await cursor.fetchall()] + count = (await cursor.fetchone())["cnt"] + if count == 0: + return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0} - if not articles: - return {"message": "Alle Artikel wurden bereits geoparsed", "new_locations": 0} + # Hintergrund-Task starten + asyncio.create_task(_run_geoparse_background(incident_id, tenant_id)) + return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"} - try: - from agents.geoparsing import geoparse_articles - geo_results = await geoparse_articles(articles) - geo_count = 0 - for art_id, locations in geo_results.items(): - for loc in locations: - await db.execute( - """INSERT INTO article_locations - (article_id, incident_id, location_name, location_name_normalized, - country_code, latitude, longitude, confidence, source_text, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (art_id, incident_id, loc["location_name"], loc["location_name_normalized"], - loc["country_code"], loc["lat"], loc["lon"], loc["confidence"], - loc.get("source_text", ""), tenant_id), - ) - geo_count += 1 - await db.commit() - return {"message": f"{geo_count} Orte aus {len(geo_results)} Artikeln extrahiert", "new_locations": geo_count} - except Exception as e: - raise HTTPException(status_code=500, detail=f"Geoparsing fehlgeschlagen: {str(e)}") + +@router.get("/{incident_id}/geoparse-status") +async def get_geoparse_status( + incident_id: int, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Status des laufenden Geoparsing-Tasks abfragen.""" + tenant_id = current_user.get("tenant_id") + await _check_incident_access(db, incident_id, current_user["id"], tenant_id) + return _geoparse_status.get(incident_id, {"status": "idle"}) @router.get("/{incident_id}/refresh-log") diff --git a/src/static/js/api.js b/src/static/js/api.js index 0a2abde..2994cee 100644 --- a/src/static/js/api.js +++ b/src/static/js/api.js @@ -110,6 +110,10 @@ const API = { return this._request('POST', `/incidents/${incidentId}/geoparse`); }, + getGeoparseStatus(incidentId) { + return this._request('GET', `/incidents/${incidentId}/geoparse-status`); + }, + refreshIncident(id) { return this._request('POST', `/incidents/${id}/refresh`); }, diff --git a/src/static/js/app.js b/src/static/js/app.js index 59a9427..27eee87 100644 --- a/src/static/js/app.js +++ b/src/static/js/app.js @@ -1429,24 +1429,57 @@ const App = { } }, + _geoparsePolling: null, + async triggerGeoparse() { if (!this.currentIncidentId) return; const btn = document.getElementById('geoparse-btn'); - if (btn) { btn.disabled = true; btn.textContent = 'Erkennung...'; } + if (btn) { btn.disabled = true; btn.textContent = 'Wird gestartet...'; } try { const result = await API.triggerGeoparse(this.currentIncidentId); - UI.showToast(result.message, result.new_locations > 0 ? 'success' : 'info'); - if (result.new_locations > 0) { - const locations = await API.getLocations(this.currentIncidentId).catch(() => []); - UI.renderMap(locations); + if (result.status === 'done') { + UI.showToast(result.message, 'info'); + if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; } + return; } + UI.showToast(result.message, 'info'); + this._pollGeoparse(this.currentIncidentId); } catch (err) { UI.showToast('Geoparsing fehlgeschlagen: ' + err.message, 'error'); - } finally { if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; } } }, + _pollGeoparse(incidentId) { + if (this._geoparsePolling) clearInterval(this._geoparsePolling); + const btn = document.getElementById('geoparse-btn'); + this._geoparsePolling = setInterval(async () => { + try { + const st = await API.getGeoparseStatus(incidentId); + if (st.status === 'running') { + if (btn) btn.textContent = `${st.processed}/${st.total} Artikel...`; + } else { + clearInterval(this._geoparsePolling); + this._geoparsePolling = null; + if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; } + if (st.status === 'done' && st.locations > 0) { + UI.showToast(`${st.locations} Orte aus ${st.processed} Artikeln erkannt`, 'success'); + const locations = await API.getLocations(incidentId).catch(() => []); + UI.renderMap(locations); + } else if (st.status === 'done') { + UI.showToast('Keine neuen Orte gefunden', 'info'); + } else if (st.status === 'error') { + UI.showToast('Geoparsing fehlgeschlagen: ' + (st.error || ''), 'error'); + } + } + } catch { + clearInterval(this._geoparsePolling); + this._geoparsePolling = null; + if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; } + } + }, 3000); + }, + _formatInterval(minutes) { if (minutes >= 10080 && minutes % 10080 === 0) { const w = minutes / 10080;