Geoparsing als Hintergrund-Task mit Fortschrittsanzeige
- Endpunkt startet async Background-Task statt synchron zu warten
- Batch-Verarbeitung (50 Artikel pro Batch)
- Neuer Status-Endpunkt GET /incidents/{id}/geoparse-status
- Frontend pollt alle 3s und zeigt Fortschritt im Button (z.B. "150/427 Artikel...")
- Kein Timeout-Problem mehr bei grossen Lagen
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
@@ -1,16 +1,20 @@
|
||||
"""Incidents-Router: Lagen verwalten (Multi-Tenant)."""
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, status
|
||||
from fastapi.responses import StreamingResponse
|
||||
from models import IncidentCreate, IncidentUpdate, IncidentResponse, SubscriptionUpdate, SubscriptionResponse
|
||||
from auth import get_current_user
|
||||
from middleware.license_check import require_writable_license
|
||||
from database import db_dependency
|
||||
from database import db_dependency, get_db
|
||||
from datetime import datetime, timezone
|
||||
import asyncio
|
||||
import aiosqlite
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import unicodedata
|
||||
|
||||
_geoparse_logger = logging.getLogger("osint.geoparse_bg")
|
||||
|
||||
router = APIRouter(prefix="/api/incidents", tags=["incidents"])
|
||||
|
||||
INCIDENT_UPDATE_COLUMNS = {
|
||||
@@ -320,48 +324,108 @@ async def get_locations(
|
||||
return list(loc_map.values())
|
||||
|
||||
|
||||
# Geoparse-Status pro Incident (in-memory)
|
||||
_geoparse_status: dict[int, dict] = {}
|
||||
|
||||
|
||||
async def _run_geoparse_background(incident_id: int, tenant_id: int | None):
|
||||
"""Hintergrund-Task: Geoparsing fuer alle Artikel einer Lage."""
|
||||
_geoparse_status[incident_id] = {"status": "running", "processed": 0, "total": 0, "locations": 0}
|
||||
db = None
|
||||
try:
|
||||
from agents.geoparsing import geoparse_articles
|
||||
db = await get_db()
|
||||
|
||||
cursor = await db.execute(
|
||||
"""SELECT a.* FROM articles a
|
||||
WHERE a.incident_id = ?
|
||||
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
||||
(incident_id, incident_id),
|
||||
)
|
||||
articles = [dict(row) for row in await cursor.fetchall()]
|
||||
|
||||
if not articles:
|
||||
_geoparse_status[incident_id] = {"status": "done", "processed": 0, "total": 0, "locations": 0}
|
||||
return
|
||||
|
||||
total = len(articles)
|
||||
_geoparse_status[incident_id]["total"] = total
|
||||
_geoparse_logger.info(f"Geoparsing Hintergrund: {total} Artikel fuer Lage {incident_id}")
|
||||
|
||||
# In Batches verarbeiten (50 Artikel pro Batch)
|
||||
batch_size = 50
|
||||
geo_count = 0
|
||||
processed = 0
|
||||
for i in range(0, total, batch_size):
|
||||
batch = articles[i:i + batch_size]
|
||||
geo_results = await geoparse_articles(batch)
|
||||
for art_id, locations in geo_results.items():
|
||||
for loc in locations:
|
||||
await db.execute(
|
||||
"""INSERT INTO article_locations
|
||||
(article_id, incident_id, location_name, location_name_normalized,
|
||||
country_code, latitude, longitude, confidence, source_text, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
||||
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
||||
loc.get("source_text", ""), tenant_id),
|
||||
)
|
||||
geo_count += 1
|
||||
await db.commit()
|
||||
processed += len(batch)
|
||||
_geoparse_status[incident_id] = {"status": "running", "processed": processed, "total": total, "locations": geo_count}
|
||||
|
||||
_geoparse_status[incident_id] = {"status": "done", "processed": processed, "total": total, "locations": geo_count}
|
||||
_geoparse_logger.info(f"Geoparsing fertig: {geo_count} Orte aus {processed} Artikeln (Lage {incident_id})")
|
||||
except Exception as e:
|
||||
_geoparse_status[incident_id] = {"status": "error", "error": str(e)}
|
||||
_geoparse_logger.error(f"Geoparsing Fehler (Lage {incident_id}): {e}")
|
||||
finally:
|
||||
if db:
|
||||
await db.close()
|
||||
|
||||
|
||||
@router.post("/{incident_id}/geoparse")
|
||||
async def trigger_geoparse(
|
||||
incident_id: int,
|
||||
current_user: dict = Depends(get_current_user),
|
||||
db: aiosqlite.Connection = Depends(db_dependency),
|
||||
):
|
||||
"""Geoparsing fuer alle Artikel einer Lage nachholen (bestehende Orte werden uebersprungen)."""
|
||||
"""Geoparsing fuer alle Artikel einer Lage als Hintergrund-Task starten."""
|
||||
tenant_id = current_user.get("tenant_id")
|
||||
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
||||
|
||||
# Artikel laden, die noch keine Locations haben
|
||||
# Bereits laufend?
|
||||
existing = _geoparse_status.get(incident_id, {})
|
||||
if existing.get("status") == "running":
|
||||
return {"status": "running", "message": f"Läuft bereits ({existing.get('processed', 0)}/{existing.get('total', 0)} Artikel)"}
|
||||
|
||||
# Pruefen ob es ueberhaupt ungeparsete Artikel gibt
|
||||
cursor = await db.execute(
|
||||
"""SELECT a.* FROM articles a
|
||||
"""SELECT COUNT(*) as cnt FROM articles a
|
||||
WHERE a.incident_id = ?
|
||||
AND a.id NOT IN (SELECT DISTINCT article_id FROM article_locations WHERE incident_id = ?)""",
|
||||
(incident_id, incident_id),
|
||||
)
|
||||
articles = [dict(row) for row in await cursor.fetchall()]
|
||||
count = (await cursor.fetchone())["cnt"]
|
||||
if count == 0:
|
||||
return {"status": "done", "message": "Alle Artikel wurden bereits verarbeitet", "locations": 0}
|
||||
|
||||
if not articles:
|
||||
return {"message": "Alle Artikel wurden bereits geoparsed", "new_locations": 0}
|
||||
# Hintergrund-Task starten
|
||||
asyncio.create_task(_run_geoparse_background(incident_id, tenant_id))
|
||||
return {"status": "started", "message": f"Geoparsing gestartet für {count} Artikel"}
|
||||
|
||||
try:
|
||||
from agents.geoparsing import geoparse_articles
|
||||
geo_results = await geoparse_articles(articles)
|
||||
geo_count = 0
|
||||
for art_id, locations in geo_results.items():
|
||||
for loc in locations:
|
||||
await db.execute(
|
||||
"""INSERT INTO article_locations
|
||||
(article_id, incident_id, location_name, location_name_normalized,
|
||||
country_code, latitude, longitude, confidence, source_text, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
||||
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
||||
loc.get("source_text", ""), tenant_id),
|
||||
)
|
||||
geo_count += 1
|
||||
await db.commit()
|
||||
return {"message": f"{geo_count} Orte aus {len(geo_results)} Artikeln extrahiert", "new_locations": geo_count}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Geoparsing fehlgeschlagen: {str(e)}")
|
||||
|
||||
@router.get("/{incident_id}/geoparse-status")
|
||||
async def get_geoparse_status(
|
||||
incident_id: int,
|
||||
current_user: dict = Depends(get_current_user),
|
||||
db: aiosqlite.Connection = Depends(db_dependency),
|
||||
):
|
||||
"""Status des laufenden Geoparsing-Tasks abfragen."""
|
||||
tenant_id = current_user.get("tenant_id")
|
||||
await _check_incident_access(db, incident_id, current_user["id"], tenant_id)
|
||||
return _geoparse_status.get(incident_id, {"status": "idle"})
|
||||
|
||||
|
||||
@router.get("/{incident_id}/refresh-log")
|
||||
|
||||
@@ -110,6 +110,10 @@ const API = {
|
||||
return this._request('POST', `/incidents/${incidentId}/geoparse`);
|
||||
},
|
||||
|
||||
getGeoparseStatus(incidentId) {
|
||||
return this._request('GET', `/incidents/${incidentId}/geoparse-status`);
|
||||
},
|
||||
|
||||
refreshIncident(id) {
|
||||
return this._request('POST', `/incidents/${id}/refresh`);
|
||||
},
|
||||
|
||||
@@ -1429,24 +1429,57 @@ const App = {
|
||||
}
|
||||
},
|
||||
|
||||
_geoparsePolling: null,
|
||||
|
||||
async triggerGeoparse() {
|
||||
if (!this.currentIncidentId) return;
|
||||
const btn = document.getElementById('geoparse-btn');
|
||||
if (btn) { btn.disabled = true; btn.textContent = 'Erkennung...'; }
|
||||
if (btn) { btn.disabled = true; btn.textContent = 'Wird gestartet...'; }
|
||||
try {
|
||||
const result = await API.triggerGeoparse(this.currentIncidentId);
|
||||
UI.showToast(result.message, result.new_locations > 0 ? 'success' : 'info');
|
||||
if (result.new_locations > 0) {
|
||||
const locations = await API.getLocations(this.currentIncidentId).catch(() => []);
|
||||
UI.renderMap(locations);
|
||||
if (result.status === 'done') {
|
||||
UI.showToast(result.message, 'info');
|
||||
if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; }
|
||||
return;
|
||||
}
|
||||
UI.showToast(result.message, 'info');
|
||||
this._pollGeoparse(this.currentIncidentId);
|
||||
} catch (err) {
|
||||
UI.showToast('Geoparsing fehlgeschlagen: ' + err.message, 'error');
|
||||
} finally {
|
||||
if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; }
|
||||
}
|
||||
},
|
||||
|
||||
_pollGeoparse(incidentId) {
|
||||
if (this._geoparsePolling) clearInterval(this._geoparsePolling);
|
||||
const btn = document.getElementById('geoparse-btn');
|
||||
this._geoparsePolling = setInterval(async () => {
|
||||
try {
|
||||
const st = await API.getGeoparseStatus(incidentId);
|
||||
if (st.status === 'running') {
|
||||
if (btn) btn.textContent = `${st.processed}/${st.total} Artikel...`;
|
||||
} else {
|
||||
clearInterval(this._geoparsePolling);
|
||||
this._geoparsePolling = null;
|
||||
if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; }
|
||||
if (st.status === 'done' && st.locations > 0) {
|
||||
UI.showToast(`${st.locations} Orte aus ${st.processed} Artikeln erkannt`, 'success');
|
||||
const locations = await API.getLocations(incidentId).catch(() => []);
|
||||
UI.renderMap(locations);
|
||||
} else if (st.status === 'done') {
|
||||
UI.showToast('Keine neuen Orte gefunden', 'info');
|
||||
} else if (st.status === 'error') {
|
||||
UI.showToast('Geoparsing fehlgeschlagen: ' + (st.error || ''), 'error');
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
clearInterval(this._geoparsePolling);
|
||||
this._geoparsePolling = null;
|
||||
if (btn) { btn.disabled = false; btn.textContent = 'Orte erkennen'; }
|
||||
}
|
||||
}, 3000);
|
||||
},
|
||||
|
||||
_formatInterval(minutes) {
|
||||
if (minutes >= 10080 && minutes % 10080 === 0) {
|
||||
const w = minutes / 10080;
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren