"""Geoparsing-Modul: Haiku-basierte Ortsextraktion und Geocoding fuer Artikel.""" import asyncio import json import logging import re from typing import Optional from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.geoparsing") # Geocoding-Cache (in-memory, lebt solange der Prozess laeuft) _geocode_cache: dict[str, Optional[dict]] = {} # geonamescache-Instanz (lazy) _gc = None def _get_geonamescache(): """Laedt geonamescache lazy.""" global _gc if _gc is None: try: import geonamescache _gc = geonamescache.GeonamesCache() logger.info("geonamescache geladen") except ImportError: logger.error("geonamescache nicht installiert - pip install geonamescache") return None return _gc # Bekannte Laendernamen (deutsch/englisch/alternativ -> ISO-2 Code + Hauptstadt-Koordinaten) _COUNTRY_ALIASES = { "libanon": {"code": "LB", "name": "Lebanon", "lat": 33.8938, "lon": 35.5018}, "lebanon": {"code": "LB", "name": "Lebanon", "lat": 33.8938, "lon": 35.5018}, "jordan": {"code": "JO", "name": "Jordan", "lat": 31.9454, "lon": 35.9284}, "jordanien": {"code": "JO", "name": "Jordan", "lat": 31.9454, "lon": 35.9284}, "iran": {"code": "IR", "name": "Iran", "lat": 35.6892, "lon": 51.3890}, "irak": {"code": "IQ", "name": "Iraq", "lat": 33.3152, "lon": 44.3661}, "iraq": {"code": "IQ", "name": "Iraq", "lat": 33.3152, "lon": 44.3661}, "israel": {"code": "IL", "name": "Israel", "lat": 31.7683, "lon": 35.2137}, "syrien": {"code": "SY", "name": "Syria", "lat": 33.5138, "lon": 36.2765}, "syria": {"code": "SY", "name": "Syria", "lat": 33.5138, "lon": 36.2765}, "tuerkei": {"code": "TR", "name": "Turkey", "lat": 39.9334, "lon": 32.8597}, "turkey": {"code": "TR", "name": "Turkey", "lat": 39.9334, "lon": 32.8597}, "kuwait": {"code": "KW", "name": "Kuwait", "lat": 29.3759, "lon": 47.9774}, "bahrain": {"code": "BH", "name": "Bahrain", "lat": 26.0667, "lon": 50.5577}, "katar": {"code": "QA", "name": "Qatar", "lat": 25.2854, "lon": 51.5310}, "qatar": {"code": "QA", "name": "Qatar", "lat": 25.2854, "lon": 51.5310}, "jemen": {"code": "YE", "name": "Yemen", "lat": 15.3694, "lon": 44.1910}, "yemen": {"code": "YE", "name": "Yemen", "lat": 15.3694, "lon": 44.1910}, "oman": {"code": "OM", "name": "Oman", "lat": 23.5880, "lon": 58.3829}, "pakistan": {"code": "PK", "name": "Pakistan", "lat": 33.6844, "lon": 73.0479}, "afghanistan": {"code": "AF", "name": "Afghanistan", "lat": 34.5553, "lon": 69.2075}, "aegypten": {"code": "EG", "name": "Egypt", "lat": 30.0444, "lon": 31.2357}, "egypt": {"code": "EG", "name": "Egypt", "lat": 30.0444, "lon": 31.2357}, "saudi-arabien": {"code": "SA", "name": "Saudi Arabia", "lat": 24.7136, "lon": 46.6753}, "saudi arabia": {"code": "SA", "name": "Saudi Arabia", "lat": 24.7136, "lon": 46.6753}, "deutschland": {"code": "DE", "name": "Germany", "lat": 52.5200, "lon": 13.4050}, "germany": {"code": "DE", "name": "Germany", "lat": 52.5200, "lon": 13.4050}, "frankreich": {"code": "FR", "name": "France", "lat": 48.8566, "lon": 2.3522}, "france": {"code": "FR", "name": "France", "lat": 48.8566, "lon": 2.3522}, "russland": {"code": "RU", "name": "Russia", "lat": 55.7558, "lon": 37.6173}, "russia": {"code": "RU", "name": "Russia", "lat": 55.7558, "lon": 37.6173}, "china": {"code": "CN", "name": "China", "lat": 39.9042, "lon": 116.4074}, "indien": {"code": "IN", "name": "India", "lat": 28.6139, "lon": 77.2090}, "india": {"code": "IN", "name": "India", "lat": 28.6139, "lon": 77.2090}, "usa": {"code": "US", "name": "United States", "lat": 38.9072, "lon": -77.0369}, "vereinigte staaten": {"code": "US", "name": "United States", "lat": 38.9072, "lon": -77.0369}, "united states": {"code": "US", "name": "United States", "lat": 38.9072, "lon": -77.0369}, "grossbritannien": {"code": "GB", "name": "United Kingdom", "lat": 51.5074, "lon": -0.1278}, "united kingdom": {"code": "GB", "name": "United Kingdom", "lat": 51.5074, "lon": -0.1278}, "schweiz": {"code": "CH", "name": "Switzerland", "lat": 46.9480, "lon": 7.4474}, "switzerland": {"code": "CH", "name": "Switzerland", "lat": 46.9480, "lon": 7.4474}, "spanien": {"code": "ES", "name": "Spain", "lat": 40.4168, "lon": -3.7038}, "spain": {"code": "ES", "name": "Spain", "lat": 40.4168, "lon": -3.7038}, "italien": {"code": "IT", "name": "Italy", "lat": 41.9028, "lon": 12.4964}, "italy": {"code": "IT", "name": "Italy", "lat": 41.9028, "lon": 12.4964}, "zypern": {"code": "CY", "name": "Cyprus", "lat": 35.1856, "lon": 33.3823}, "cyprus": {"code": "CY", "name": "Cyprus", "lat": 35.1856, "lon": 33.3823}, "aserbaidschan": {"code": "AZ", "name": "Azerbaijan", "lat": 40.4093, "lon": 49.8671}, "azerbaijan": {"code": "AZ", "name": "Azerbaijan", "lat": 40.4093, "lon": 49.8671}, "griechenland": {"code": "GR", "name": "Greece", "lat": 37.9838, "lon": 23.7275}, "greece": {"code": "GR", "name": "Greece", "lat": 37.9838, "lon": 23.7275}, "niederlande": {"code": "NL", "name": "Netherlands", "lat": 52.3676, "lon": 4.9041}, "netherlands": {"code": "NL", "name": "Netherlands", "lat": 52.3676, "lon": 4.9041}, "ukraine": {"code": "UA", "name": "Ukraine", "lat": 50.4501, "lon": 30.5234}, } def _geocode_offline(name: str, country_code: str = "") -> Optional[dict]: """Geocoding ueber geonamescache (offline). Reihenfolge: 1. Bekannte Laender-Aliase, 2. geonamescache-Laender, 3. Staedte. Laender werden IMMER vor Staedten geprueft um Verwechslungen zu vermeiden (z.B. Lebanon/US vs Libanon, Jordan/HK vs Jordanien). """ gc = _get_geonamescache() if gc is None: return None name_lower = name.lower().strip() # 1. Bekannte Laender-Aliase (schnellster + sicherster Pfad) alias = _COUNTRY_ALIASES.get(name_lower) if alias: return { "lat": alias["lat"], "lon": alias["lon"], "country_code": alias["code"], "normalized_name": alias["name"], "confidence": 0.95, } # 2. geonamescache Laendersuche (vor Staedten!) countries = gc.get_countries() for code, country in countries.items(): if country.get("name", "").lower() == name_lower: capital = country.get("capital", "") if capital: # Hauptstadt geocoden, aber als Land benennen cap_alias = _COUNTRY_ALIASES.get(capital.lower()) if cap_alias: return { "lat": cap_alias["lat"], "lon": cap_alias["lon"], "country_code": code, "normalized_name": country["name"], "confidence": 0.9, } # Rekursiv die Hauptstadt suchen (nur Staedte-Pfad) cap_result = _geocode_city(capital, code) if cap_result: cap_result["normalized_name"] = country["name"] cap_result["confidence"] = 0.9 return cap_result # 3. Stadtsuche (nur wenn kein Land gefunden) return _geocode_city(name, country_code) def _geocode_city(name: str, country_code: str = "") -> Optional[dict]: """Sucht einen Stadtnamen in geonamescache.""" gc = _get_geonamescache() if gc is None: return None name_lower = name.lower().strip() cities = gc.get_cities() matches = [] for gid, city in cities.items(): city_name = city.get("name", "") alt_names = city.get("alternatenames", "") if isinstance(alt_names, list): alt_list = [n.strip().lower() for n in alt_names if n.strip()] else: alt_list = [n.strip().lower() for n in str(alt_names).split(",") if n.strip()] if city_name.lower() == name_lower or name_lower in alt_list: matches.append(city) if not matches: return None # Disambiguierung: country_code bevorzugen, dann Population if country_code: cc_matches = [c for c in matches if c.get("countrycode", "").upper() == (country_code or "").upper()] if cc_matches: matches = cc_matches best = max(matches, key=lambda c: c.get("population", 0)) return { "lat": float(best["latitude"]), "lon": float(best["longitude"]), "country_code": best.get("countrycode", ""), "normalized_name": best["name"], "confidence": min(1.0, 0.6 + (best.get("population", 0) / 10_000_000)), } def _geocode_location(name: str, country_code: str = "", haiku_coords: Optional[dict] = None) -> Optional[dict]: """Geocoded einen Ortsnamen. Prioritaet: geonamescache > Haiku-Koordinaten. Args: name: Ortsname country_code: ISO-2 Code (von Haiku) haiku_coords: {"lat": float, "lon": float} (Fallback von Haiku) """ cache_key = f"{name.lower().strip()}|{(country_code or '').upper()}" if cache_key in _geocode_cache: return _geocode_cache[cache_key] result = _geocode_offline(name, country_code) # Fallback: Haiku-Koordinaten nutzen if result is None and haiku_coords: lat = haiku_coords.get("lat") lon = haiku_coords.get("lon") if lat is not None and lon is not None: result = { "lat": float(lat), "lon": float(lon), "country_code": country_code.upper() if country_code else "", "normalized_name": name, "confidence": 0.45, } _geocode_cache[cache_key] = result return result # Default-Labels (Fallback wenn Haiku keine generiert) DEFAULT_CATEGORY_LABELS = { "primary": "Hauptgeschehen", "secondary": "Reaktionen", "tertiary": "Beteiligte", "mentioned": "Erwaehnt", } CATEGORY_LABELS_PROMPT = """Generiere kurze, praegnante Kategorie-Labels fuer Karten-Pins zu dieser Nachrichtenlage. Lage: "{incident_context}" Es gibt 4 Farbstufen fuer Orte auf der Karte: 1. primary (Rot): Wo das Hauptgeschehen stattfindet 2. secondary (Orange): Direkte Reaktionen/Gegenmassnahmen 3. tertiary (Blau): Entscheidungstraeger/Beteiligte 4. mentioned (Grau): Nur erwaehnt Generiere fuer jede Stufe ein kurzes Label (1-3 Woerter), das zum Thema passt. Wenn eine Stufe fuer dieses Thema nicht sinnvoll ist, setze null. Beispiele: - Militaerkonflikt Iran: {{"primary": "Kampfschauplätze", "secondary": "Vergeltungsschläge", "tertiary": "Strategische Akteure", "mentioned": "Erwähnt"}} - Erdbeben Tuerkei: {{"primary": "Katastrophenzone", "secondary": "Hilfsoperationen", "tertiary": "Geberländer", "mentioned": "Erwähnt"}} - Bundestagswahl: {{"primary": "Wahlkreise", "secondary": "Koalitionspartner", "tertiary": "Internationale Reaktionen", "mentioned": "Erwähnt"}} Antworte NUR als JSON-Objekt:""" async def generate_category_labels(incident_context: str) -> dict[str, str | None]: """Generiert kontextabhaengige Kategorie-Labels via Haiku. Args: incident_context: Lage-Titel + Beschreibung Returns: Dict mit Labels fuer primary/secondary/tertiary/mentioned (oder None wenn nicht passend) """ if not incident_context or not incident_context.strip(): return dict(DEFAULT_CATEGORY_LABELS) prompt = CATEGORY_LABELS_PROMPT.format(incident_context=incident_context[:500]) try: result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) parsed = None try: parsed = json.loads(result_text) except json.JSONDecodeError: match = re.search(r'\{.*\}', result_text, re.DOTALL) if match: try: parsed = json.loads(match.group()) except json.JSONDecodeError: pass if not parsed or not isinstance(parsed, dict): logger.warning("generate_category_labels: Kein gueltiges JSON erhalten") return dict(DEFAULT_CATEGORY_LABELS) # Validierung: Nur erlaubte Keys, Werte muessen str oder None sein valid_keys = {"primary", "secondary", "tertiary", "mentioned"} labels = {} for key in valid_keys: val = parsed.get(key) if val is None or val == "null": labels[key] = None elif isinstance(val, str) and val.strip(): labels[key] = val.strip() else: labels[key] = DEFAULT_CATEGORY_LABELS.get(key) # mentioned sollte immer einen Wert haben if not labels.get("mentioned"): labels["mentioned"] = "Erwaehnt" logger.info(f"Kategorie-Labels generiert: {labels}") return labels except Exception as e: logger.error(f"generate_category_labels fehlgeschlagen: {e}") return dict(DEFAULT_CATEGORY_LABELS) HAIKU_GEOPARSE_PROMPT = """Extrahiere alle geographischen Orte aus diesen Nachrichten-Headlines. Kontext der Lage: "{incident_context}" Regeln: - Nur echte Orte (Staedte, Laender, Regionen) - Keine Personen, Organisationen, Gebaeude, Alltagswoerter - Bei "US-Militaer" etc: Land (USA) extrahieren, nicht das Kompositum - HTML-Tags ignorieren - Jeder Ort nur einmal pro Headline - Regionen wie "Middle East", "Gulf", "Naher Osten" NICHT extrahieren (kein einzelner Punkt auf der Karte) Klassifiziere basierend auf dem Lage-Kontext: - "primary": Wo das Hauptgeschehen stattfindet (z.B. Angriffsziele, Katastrophenzone, Wahlkreise) - "secondary": Direkte Reaktionen oder Gegenmassnahmen (z.B. Vergeltung, Hilfsoperationen) - "tertiary": Entscheidungstraeger, Beteiligte (z.B. wo Entscheidungen getroffen werden) - "mentioned": Nur erwaehnt, kein direkter Bezug Headlines: {headlines} Antwort NUR als JSON-Array, kein anderer Text: [{{"headline_idx": 0, "locations": [ {{"name": "Teheran", "normalized": "Tehran", "country_code": "IR", "type": "city", "category": "primary", "lat": 35.69, "lon": 51.42}} ]}}]""" async def _extract_locations_haiku( headlines: list[dict], incident_context: str ) -> dict[int, list[dict]]: """Extrahiert Orte aus Headlines via Haiku. Args: headlines: [{"idx": article_id, "text": headline_text}, ...] incident_context: Lage-Kontext fuer Klassifizierung Returns: dict[article_id -> list[{name, normalized, country_code, type, category, lat, lon}]] """ if not headlines: return {} # Headlines formatieren headline_lines = [] for i, h in enumerate(headlines): headline_lines.append(f"[{i}] {h['text']}") prompt = HAIKU_GEOPARSE_PROMPT.format( incident_context=incident_context or "Allgemeine Nachrichtenlage", headlines="\n".join(headline_lines), ) try: result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) except Exception as e: logger.error(f"Haiku-Geoparsing fehlgeschlagen: {e}") return {} # JSON parsen (mit Regex-Fallback) parsed = None try: parsed = json.loads(result_text) except json.JSONDecodeError: match = re.search(r'\[.*\]', result_text, re.DOTALL) if match: try: parsed = json.loads(match.group()) except json.JSONDecodeError: logger.warning("Haiku-Geoparsing: JSON-Parse fehlgeschlagen auch mit Regex-Fallback") return {} if not parsed or not isinstance(parsed, list): logger.warning("Haiku-Geoparsing: Kein gueltiges JSON-Array erhalten") return {} # Ergebnisse den Artikeln zuordnen results = {} for entry in parsed: if not isinstance(entry, dict): continue headline_idx = entry.get("headline_idx") if headline_idx is None or headline_idx >= len(headlines): continue article_id = headlines[headline_idx]["idx"] locations = entry.get("locations", []) if not locations: continue article_locs = [] for loc in locations: if not isinstance(loc, dict): continue loc_type = loc.get("type", "city") # Regionen nicht speichern (kein sinnvoller Punkt auf der Karte) if loc_type == "region": continue name = loc.get("name", "") if not name: continue raw_cat = loc.get("category", "mentioned") # Alte Kategorien mappen (falls Haiku sie noch generiert) cat_map = {"target": "primary", "response": "secondary", "retaliation": "secondary", "actor": "tertiary", "context": "tertiary"} category = cat_map.get(raw_cat, raw_cat) if category not in ("primary", "secondary", "tertiary", "mentioned"): category = "mentioned" article_locs.append({ "name": name, "normalized": loc.get("normalized", name), "country_code": loc.get("country_code", ""), "type": loc_type, "category": category, "lat": loc.get("lat"), "lon": loc.get("lon"), }) if article_locs: results[article_id] = article_locs return results async def geoparse_articles( articles: list[dict], incident_context: str = "", ) -> tuple[dict[int, list[dict]], dict[str, str | None] | None]: """Geoparsing fuer eine Liste von Artikeln via Haiku + geonamescache. Args: articles: Liste von Artikel-Dicts (mit id, headline, headline_de, language) incident_context: Lage-Kontext (Titel + Beschreibung) fuer kontextbewusste Klassifizierung Returns: Tuple von (dict[article_id -> list[locations]], category_labels oder None) """ if not articles: return {}, None # Labels parallel zum Geoparsing generieren (nur wenn Kontext vorhanden) labels_task = None if incident_context: labels_task = asyncio.create_task(generate_category_labels(incident_context)) # Headlines sammeln headlines = [] for article in articles: article_id = article.get("id") if not article_id: continue # Deutsche Headline bevorzugen headline = article.get("headline_de") or article.get("headline") or "" headline = headline.strip() if not headline: continue headlines.append({"idx": article_id, "text": headline}) if not headlines: category_labels = None if labels_task: try: category_labels = await labels_task except Exception: pass return {}, category_labels # Batches bilden (max 50 Headlines pro Haiku-Call) batch_size = 50 all_haiku_results = {} for i in range(0, len(headlines), batch_size): batch = headlines[i:i + batch_size] batch_results = await _extract_locations_haiku(batch, incident_context) all_haiku_results.update(batch_results) if not all_haiku_results: category_labels = None if labels_task: try: category_labels = await labels_task except Exception: pass return {}, category_labels # Geocoding via geonamescache (mit Haiku-Koordinaten als Fallback) result = {} for article_id, haiku_locs in all_haiku_results.items(): locations = [] for loc in haiku_locs: haiku_coords = None if loc.get("lat") is not None and loc.get("lon") is not None: haiku_coords = {"lat": loc["lat"], "lon": loc["lon"]} geo = _geocode_location( loc["normalized"], loc.get("country_code", ""), haiku_coords, ) if geo: locations.append({ "location_name": loc["name"], "location_name_normalized": geo["normalized_name"], "country_code": geo["country_code"], "lat": geo["lat"], "lon": geo["lon"], "confidence": geo["confidence"], "source_text": loc["name"], "category": loc.get("category", "mentioned"), }) if locations: result[article_id] = locations # Category-Labels abwarten category_labels = None if labels_task: try: category_labels = await labels_task except Exception as e: logger.warning(f"Category-Labels konnten nicht generiert werden: {e}") return result, category_labels