Kartenfeature: Geoparsing + Leaflet-Karte im Dashboard
- Neues Geoparsing-Modul (spaCy NER + geonamescache/Nominatim)
- article_locations-Tabelle mit Migration
- Pipeline-Integration nach Artikel-Speicherung
- API-Endpunkt GET /incidents/{id}/locations
- Leaflet.js + MarkerCluster im Dashboard-Grid
- Theme-aware Kartenkacheln (CartoDB dark / OSM light)
- Gold-Akzent MarkerCluster, Popup mit Artikelliste
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
287
src/agents/geoparsing.py
Normale Datei
287
src/agents/geoparsing.py
Normale Datei
@@ -0,0 +1,287 @@
|
||||
"""Geoparsing-Modul: NER-basierte Ortsextraktion und Geocoding fuer Artikel."""
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger("osint.geoparsing")
|
||||
|
||||
# Lazy-loaded spaCy-Modelle (erst beim ersten Aufruf geladen)
|
||||
_nlp_de = None
|
||||
_nlp_en = None
|
||||
|
||||
# Stopwords: Entitaeten die von spaCy faelschlicherweise als Orte erkannt werden
|
||||
LOCATION_STOPWORDS = {
|
||||
"EU", "UN", "NATO", "WHO", "OSZE", "OPEC", "G7", "G20", "BRICS",
|
||||
"Nato", "Eu", "Un", "Onu",
|
||||
"Bundesregierung", "Bundestag", "Bundesrat", "Bundeskanzler",
|
||||
"Kreml", "Weisses Haus", "White House", "Pentagon", "Elysee",
|
||||
"Twitter", "Facebook", "Telegram", "Signal", "WhatsApp",
|
||||
"Reuters", "AP", "AFP", "DPA", "dpa",
|
||||
"Internet", "Online", "Web",
|
||||
}
|
||||
|
||||
# Maximale Textlaenge fuer NER-Verarbeitung
|
||||
MAX_TEXT_LENGTH = 10000
|
||||
|
||||
|
||||
def _load_spacy_model(lang: str):
|
||||
"""Laedt ein spaCy-Modell lazy (nur beim ersten Aufruf)."""
|
||||
global _nlp_de, _nlp_en
|
||||
try:
|
||||
import spacy
|
||||
except ImportError:
|
||||
logger.error("spaCy nicht installiert - pip install spacy")
|
||||
return None
|
||||
|
||||
if lang == "de" and _nlp_de is None:
|
||||
try:
|
||||
_nlp_de = spacy.load("de_core_news_sm", disable=["parser", "lemmatizer", "textcat"])
|
||||
logger.info("spaCy-Modell de_core_news_sm geladen")
|
||||
except OSError:
|
||||
logger.warning("spaCy-Modell de_core_news_sm nicht gefunden - python -m spacy download de_core_news_sm")
|
||||
return None
|
||||
elif lang == "en" and _nlp_en is None:
|
||||
try:
|
||||
_nlp_en = spacy.load("en_core_web_sm", disable=["parser", "lemmatizer", "textcat"])
|
||||
logger.info("spaCy-Modell en_core_web_sm geladen")
|
||||
except OSError:
|
||||
logger.warning("spaCy-Modell en_core_web_sm nicht gefunden - python -m spacy download en_core_web_sm")
|
||||
return None
|
||||
|
||||
return _nlp_de if lang == "de" else _nlp_en
|
||||
|
||||
|
||||
def _extract_locations_from_text(text: str, language: str = "de") -> list[dict]:
|
||||
"""Extrahiert Ortsnamen aus Text via spaCy NER.
|
||||
|
||||
Returns:
|
||||
Liste von dicts: [{name: str, source_text: str}]
|
||||
"""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
text = text[:MAX_TEXT_LENGTH]
|
||||
|
||||
nlp = _load_spacy_model(language)
|
||||
if nlp is None:
|
||||
# Fallback: anderes Modell versuchen
|
||||
fallback = "en" if language == "de" else "de"
|
||||
nlp = _load_spacy_model(fallback)
|
||||
if nlp is None:
|
||||
return []
|
||||
|
||||
doc = nlp(text)
|
||||
|
||||
locations = []
|
||||
seen = set()
|
||||
for ent in doc.ents:
|
||||
if ent.label_ in ("LOC", "GPE"):
|
||||
name = ent.text.strip()
|
||||
# Filter: zu kurz, Stopword, oder nur Zahlen/Sonderzeichen
|
||||
if len(name) < 2:
|
||||
continue
|
||||
if name in LOCATION_STOPWORDS:
|
||||
continue
|
||||
if re.match(r'^[\d\W]+$', name):
|
||||
continue
|
||||
|
||||
name_lower = name.lower()
|
||||
if name_lower not in seen:
|
||||
seen.add(name_lower)
|
||||
# Kontext: 50 Zeichen um die Entitaet herum
|
||||
start = max(0, ent.start_char - 25)
|
||||
end = min(len(text), ent.end_char + 25)
|
||||
source_text = text[start:end].strip()
|
||||
locations.append({"name": name, "source_text": source_text})
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
# Geocoding-Cache (in-memory, lebt solange der Prozess laeuft)
|
||||
_geocode_cache: dict[str, Optional[dict]] = {}
|
||||
|
||||
# geonamescache-Instanz (lazy)
|
||||
_gc = None
|
||||
|
||||
|
||||
def _get_geonamescache():
|
||||
"""Laedt geonamescache lazy."""
|
||||
global _gc
|
||||
if _gc is None:
|
||||
try:
|
||||
import geonamescache
|
||||
_gc = geonamescache.GeonamesCache()
|
||||
logger.info("geonamescache geladen")
|
||||
except ImportError:
|
||||
logger.error("geonamescache nicht installiert - pip install geonamescache")
|
||||
return None
|
||||
return _gc
|
||||
|
||||
|
||||
def _geocode_location(name: str) -> Optional[dict]:
|
||||
"""Geocoded einen Ortsnamen. Offline via geonamescache, Fallback Nominatim.
|
||||
|
||||
Returns:
|
||||
dict mit {lat, lon, country_code, normalized_name, confidence} oder None
|
||||
"""
|
||||
name_lower = name.lower().strip()
|
||||
if name_lower in _geocode_cache:
|
||||
return _geocode_cache[name_lower]
|
||||
|
||||
result = _geocode_offline(name)
|
||||
if result is None:
|
||||
result = _geocode_nominatim(name)
|
||||
|
||||
_geocode_cache[name_lower] = result
|
||||
return result
|
||||
|
||||
|
||||
def _geocode_offline(name: str) -> Optional[dict]:
|
||||
"""Versucht Geocoding ueber geonamescache (offline)."""
|
||||
gc = _get_geonamescache()
|
||||
if gc is None:
|
||||
return None
|
||||
|
||||
name_lower = name.lower().strip()
|
||||
|
||||
# 1. Direkte Suche in Staedten
|
||||
cities = gc.get_cities()
|
||||
matches = []
|
||||
for gid, city in cities.items():
|
||||
city_name = city.get("name", "")
|
||||
alt_names = city.get("alternatenames", "")
|
||||
if city_name.lower() == name_lower:
|
||||
matches.append(city)
|
||||
elif name_lower in [n.strip().lower() for n in alt_names.split(",") if n.strip()]:
|
||||
matches.append(city)
|
||||
|
||||
if matches:
|
||||
# Disambiguierung: groesste Stadt gewinnt
|
||||
best = max(matches, key=lambda c: c.get("population", 0))
|
||||
return {
|
||||
"lat": float(best["latitude"]),
|
||||
"lon": float(best["longitude"]),
|
||||
"country_code": best.get("countrycode", ""),
|
||||
"normalized_name": best["name"],
|
||||
"confidence": min(1.0, 0.6 + (best.get("population", 0) / 10_000_000)),
|
||||
}
|
||||
|
||||
# 2. Laendersuche
|
||||
countries = gc.get_countries()
|
||||
for code, country in countries.items():
|
||||
if country.get("name", "").lower() == name_lower:
|
||||
# Hauptstadt-Koordinaten als Fallback
|
||||
capital = country.get("capital", "")
|
||||
if capital:
|
||||
cap_result = _geocode_offline(capital)
|
||||
if cap_result:
|
||||
cap_result["normalized_name"] = country["name"]
|
||||
cap_result["confidence"] = 0.5 # Land, nicht Stadt
|
||||
return cap_result
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _geocode_nominatim(name: str) -> Optional[dict]:
|
||||
"""Fallback-Geocoding ueber Nominatim (1 Request/Sekunde)."""
|
||||
try:
|
||||
from geopy.geocoders import Nominatim
|
||||
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
try:
|
||||
geocoder = Nominatim(user_agent="aegissight-monitor/1.0", timeout=5)
|
||||
location = geocoder.geocode(name, language="de", exactly_one=True)
|
||||
if location:
|
||||
# Country-Code aus Address extrahieren falls verfuegbar
|
||||
raw = location.raw or {}
|
||||
country_code = ""
|
||||
if "address" in raw:
|
||||
country_code = raw["address"].get("country_code", "").upper()
|
||||
|
||||
return {
|
||||
"lat": float(location.latitude),
|
||||
"lon": float(location.longitude),
|
||||
"country_code": country_code,
|
||||
"normalized_name": location.address.split(",")[0] if location.address else name,
|
||||
"confidence": 0.4, # Nominatim-Ergebnis = niedrigere Konfidenz
|
||||
}
|
||||
except (GeocoderTimedOut, GeocoderServiceError) as e:
|
||||
logger.debug(f"Nominatim-Fehler fuer '{name}': {e}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Geocoding-Fehler fuer '{name}': {e}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def geoparse_articles(articles: list[dict]) -> dict[int, list[dict]]:
|
||||
"""Geoparsing fuer eine Liste von Artikeln.
|
||||
|
||||
Args:
|
||||
articles: Liste von Artikel-Dicts (mit id, content_de, content_original, language, headline, headline_de)
|
||||
|
||||
Returns:
|
||||
dict[article_id -> list[{location_name, location_name_normalized, country_code, lat, lon, confidence, source_text}]]
|
||||
"""
|
||||
if not articles:
|
||||
return {}
|
||||
|
||||
result = {}
|
||||
|
||||
for article in articles:
|
||||
article_id = article.get("id")
|
||||
if not article_id:
|
||||
continue
|
||||
|
||||
language = article.get("language", "de")
|
||||
|
||||
# Text zusammenbauen: Headline + Content
|
||||
text_parts = []
|
||||
if language == "de":
|
||||
if article.get("headline_de"):
|
||||
text_parts.append(article["headline_de"])
|
||||
elif article.get("headline"):
|
||||
text_parts.append(article["headline"])
|
||||
if article.get("content_de"):
|
||||
text_parts.append(article["content_de"])
|
||||
elif article.get("content_original"):
|
||||
text_parts.append(article["content_original"])
|
||||
else:
|
||||
if article.get("headline"):
|
||||
text_parts.append(article["headline"])
|
||||
if article.get("content_original"):
|
||||
text_parts.append(article["content_original"])
|
||||
|
||||
text = "\n".join(text_parts)
|
||||
if not text.strip():
|
||||
continue
|
||||
|
||||
# NER-Extraktion (CPU-bound, in Thread ausfuehren)
|
||||
locations_raw = await asyncio.to_thread(
|
||||
_extract_locations_from_text, text, language
|
||||
)
|
||||
|
||||
if not locations_raw:
|
||||
continue
|
||||
|
||||
# Geocoding (enthaelt potentiell Netzwerk-Calls)
|
||||
locations = []
|
||||
for loc in locations_raw:
|
||||
geo = await asyncio.to_thread(_geocode_location, loc["name"])
|
||||
if geo:
|
||||
locations.append({
|
||||
"location_name": loc["name"],
|
||||
"location_name_normalized": geo["normalized_name"],
|
||||
"country_code": geo["country_code"],
|
||||
"lat": geo["lat"],
|
||||
"lon": geo["lon"],
|
||||
"confidence": geo["confidence"],
|
||||
"source_text": loc.get("source_text", ""),
|
||||
})
|
||||
|
||||
if locations:
|
||||
result[article_id] = locations
|
||||
|
||||
return result
|
||||
@@ -708,6 +708,31 @@ class AgentOrchestrator:
|
||||
|
||||
await db.commit()
|
||||
|
||||
# Geoparsing: Orte aus neuen Artikeln extrahieren und speichern
|
||||
if new_articles_for_analysis:
|
||||
try:
|
||||
from agents.geoparsing import geoparse_articles
|
||||
logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...")
|
||||
geo_results = await geoparse_articles(new_articles_for_analysis)
|
||||
geo_count = 0
|
||||
for art_id, locations in geo_results.items():
|
||||
for loc in locations:
|
||||
await db.execute(
|
||||
"""INSERT INTO article_locations
|
||||
(article_id, incident_id, location_name, location_name_normalized,
|
||||
country_code, latitude, longitude, confidence, source_text, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
|
||||
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
|
||||
loc.get("source_text", ""), tenant_id),
|
||||
)
|
||||
geo_count += 1
|
||||
if geo_count > 0:
|
||||
await db.commit()
|
||||
logger.info(f"Geoparsing: {geo_count} Orte aus {len(geo_results)} Artikeln gespeichert")
|
||||
except Exception as e:
|
||||
logger.warning(f"Geoparsing fehlgeschlagen (Pipeline laeuft weiter): {e}")
|
||||
|
||||
# Quellen-Statistiken aktualisieren
|
||||
if new_count > 0:
|
||||
try:
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren