From b88b3057168634d8a1c8d853ab3597ce0770ecce Mon Sep 17 00:00:00 2001 From: Claude Dev Date: Tue, 24 Mar 2026 10:49:23 +0100 Subject: [PATCH] GEOINT: Globaler Schiffsverkehr via AISStream.io Digitraffic (nur Nordeuropa) ersetzt durch AISStream.io WebSocket: - Globale Echtzeit-AIS-Daten (tausende Schiffe weltweit) - Dauerhafter WebSocket-Client im Backend, auto-reconnect - Schiffsnamen im Popup (MMSI, SOG, COG) - Binary-Frame Parsing fuer WebSocket-Nachrichten - Auto-Start bei Server-Hochfahren - Stale-Cleanup (>15 Min alte Positionen entfernt) --- src/routers/geoint.py | 175 +++++++++++++++++++++++----------------- src/static/js/geoint.js | 8 +- 2 files changed, 106 insertions(+), 77 deletions(-) diff --git a/src/routers/geoint.py b/src/routers/geoint.py index 756a06d..9c38b71 100644 --- a/src/routers/geoint.py +++ b/src/routers/geoint.py @@ -1,10 +1,12 @@ -"""GEOINT-Router: Proxy fuer externe Echtzeit-Datenquellen (Flugverkehr, GDELT).""" +"""GEOINT-Router: Proxy fuer externe Echtzeit-Datenquellen (Flugverkehr, Schiffsverkehr, GDELT).""" import asyncio +import json as _json import logging import time from typing import Optional import httpx +import websockets from fastapi import APIRouter, Depends, Query from auth import get_current_user @@ -38,8 +40,6 @@ def _set_cache(key: str, data: dict): # Flugverkehr: Globaler Snapshot (airplanes.live) # --------------------------------------------------------------------------- -# Stuetzpunkte fuer globale Abdeckung (je 250nm Radius ≈ 460km) -# Abdeckt: Europa, Naher Osten, Nordafrika, Nordamerika, Ostasien _FLIGHT_GRID = [ # Europa (48.0, 2.0), # Westeuropa (Paris) @@ -127,7 +127,6 @@ async def _fetch_global_flights() -> dict: return cached async with _flight_lock: - # Doppelcheck nach Lock cached = _get_cached("flights_global", ttl=30) if cached: return cached @@ -136,14 +135,10 @@ async def _fetch_global_flights() -> dict: errors = 0 async with httpx.AsyncClient(timeout=10) as client: - # In Batches von 8 um Rate-Limits zu vermeiden for i in range(0, len(_FLIGHT_GRID), 8): batch = _FLIGHT_GRID[i:i + 8] - tasks = [] - for lat, lon in batch: - url = f"https://api.airplanes.live/v2/point/{lat:.2f}/{lon:.2f}/250" - tasks.append(client.get(url)) - + tasks = [client.get(f"https://api.airplanes.live/v2/point/{lat:.2f}/{lon:.2f}/250") + for lat, lon in batch] results = await asyncio.gather(*tasks, return_exceptions=True) for r in results: if isinstance(r, Exception): @@ -157,95 +152,119 @@ async def _fetch_global_flights() -> dict: seen[hex_id] = ac except Exception: errors += 1 - - # Kurze Pause zwischen Batches if i + 8 < len(_FLIGHT_GRID): await asyncio.sleep(0.3) result = {"ac": list(seen.values()), "total": len(seen), "errors": errors} - logger.info( - f"GEOINT Flights: {len(seen)} Flugzeuge aus {len(_FLIGHT_GRID)} Punkten" - f" ({errors} Fehler)" - ) + logger.info(f"GEOINT Flights: {len(seen)} Flugzeuge ({errors} Fehler)") _set_cache("flights_global", result) return result @router.get("/flights") -async def get_flights( - _user: dict = Depends(get_current_user), -): - """Globaler Flugverkehr-Snapshot. 30s Cache, dedupliziert.""" +async def get_flights(_user: dict = Depends(get_current_user)): + """Globaler Flugverkehr-Snapshot. 30s Cache.""" return await _fetch_global_flights() # --------------------------------------------------------------------------- -# Schiffsverkehr: Digitraffic AIS (kostenlos, global, kein API-Key) +# Schiffsverkehr: AISStream.io (globales Echtzeit-AIS via WebSocket) # --------------------------------------------------------------------------- +_AISSTREAM_KEY = "1a56b078db829727abd4d617937bae51c6f9973e" +_AISSTREAM_URL = "wss://stream.aisstream.io/v0/stream" + +# Globaler Schiffs-Store: {mmsi: {lat, lon, sog, cog, heading, name, ship_type, ts}} +_ships_store: dict[int, dict] = {} _ships_lock = asyncio.Lock() +_ships_ws_task: Optional[asyncio.Task] = None +_ships_connected = False -async def _fetch_global_ships() -> dict: - """Holt globale AIS-Schiffspositionen von Digitraffic.""" - cached = _get_cached("ships_global", ttl=60) - if cached: - return cached - - async with _ships_lock: - cached = _get_cached("ships_global", ttl=60) - if cached: - return cached - - url = "https://meri.digitraffic.fi/api/ais/v1/locations" +async def _aisstream_listener(): + """Dauerhafter WebSocket-Client fuer AISStream. Sammelt Schiffspositionen.""" + global _ships_connected + while True: try: - async with httpx.AsyncClient(timeout=20) as client: - resp = await client.get( - url, - headers={ - "Digitraffic-User": "AegisSight-GEOINT", - "Accept-Encoding": "gzip", - }, - ) - resp.raise_for_status() - data = resp.json() + logger.info("AISStream: Verbinde...") + async with websockets.connect(_AISSTREAM_URL, ping_interval=30, ping_timeout=10, + close_timeout=5) as ws: + # Subscription: globale BoundingBox, nur Positionsberichte + sub = { + "APIKey": _AISSTREAM_KEY, + "BoundingBoxes": [[[-90, -180], [90, 180]]], + "FilterMessageTypes": ["PositionReport"], + } + await ws.send(_json.dumps(sub)) + _ships_connected = True + logger.info("AISStream: Verbunden, empfange Schiffsdaten...") + + async for raw in ws: + try: + text = raw.decode("utf-8") if isinstance(raw, bytes) else raw + msg = _json.loads(text) + meta = msg.get("MetaData", {}) + mmsi = meta.get("MMSI") + if not mmsi: + continue + + pos = msg.get("Message", {}).get("PositionReport", {}) + lat = meta.get("latitude") or pos.get("Latitude") + lon = meta.get("longitude") or pos.get("Longitude") + if not lat or not lon or not (-90 <= lat <= 90 and -180 <= lon <= 180): + continue + + _ships_store[mmsi] = { + "mmsi": mmsi, + "lat": round(lat, 5), + "lon": round(lon, 5), + "sog": round(pos.get("Sog", 0), 1), + "cog": round(pos.get("Cog", 0), 1), + "heading": pos.get("TrueHeading", 0), + "name": (meta.get("ShipName") or "").strip(), + "ts": time.time(), + } + if len(_ships_store) % 1000 == 0: + logger.info(f"AISStream: {len(_ships_store)} Schiffe gesammelt") + + # Alte Eintraege alle 60s bereinigen (>15 Min alt) + if len(_ships_store) % 500 == 0: + cutoff = time.time() - 900 + stale = [k for k, v in _ships_store.items() if v["ts"] < cutoff] + for k in stale: + del _ships_store[k] + + except Exception as parse_err: + if len(_ships_store) < 5: + logger.warning(f"AISStream Parse-Fehler: {parse_err}, raw type: {type(raw)}, first 100: {str(raw)[:100]}") + continue + except Exception as e: - logger.warning(f"Digitraffic AIS Fehler: {e}") - return {"features": [], "total": 0} + _ships_connected = False + logger.warning(f"AISStream Fehler: {e}. Reconnect in 10s...") + await asyncio.sleep(10) - features = data.get("features", []) - # Nur Schiffe mit gueltigen Koordinaten und Bewegung (sog > 0.5 kn) - ships = [] - for f in features: - geom = f.get("geometry") - props = f.get("properties", {}) - if not geom or not geom.get("coordinates"): - continue - lon, lat = geom["coordinates"] - if not (-180 <= lon <= 180 and -90 <= lat <= 90): - continue - ships.append({ - "mmsi": props.get("mmsi"), - "lat": lat, - "lon": lon, - "sog": props.get("sog", 0), - "cog": props.get("cog", 0), - "heading": props.get("heading", 0), - "navStat": props.get("navStat", 0), - }) - result = {"ships": ships, "total": len(ships)} - logger.info(f"GEOINT Ships: {len(ships)} Schiffe geladen") - _set_cache("ships_global", result) - return result +def _start_aisstream(): + """Startet den AISStream-Listener als Background-Task.""" + global _ships_ws_task + if _ships_ws_task is None or _ships_ws_task.done(): + _ships_ws_task = asyncio.create_task(_aisstream_listener()) + logger.info("AISStream Background-Task gestartet") @router.get("/ships") -async def get_ships( - _user: dict = Depends(get_current_user), -): - """Globaler Schiffsverkehr-Snapshot. 60s Cache.""" - return await _fetch_global_ships() +async def get_ships(_user: dict = Depends(get_current_user)): + """Globaler Schiffsverkehr aus AISStream. Echtzeit-Positionen.""" + # Lazy-Start: WebSocket-Listener beim ersten Abruf starten + _start_aisstream() + + ships = list(_ships_store.values()) + return { + "ships": ships, + "total": len(ships), + "connected": _ships_connected, + } # --------------------------------------------------------------------------- @@ -278,3 +297,11 @@ async def get_gdelt( _set_cache(cache_key, data) return data + + +# Auto-Start: AISStream bei erstem Request starten +@router.on_event("startup") +async def _auto_start_aisstream(): + import asyncio + await asyncio.sleep(2) + _start_aisstream() diff --git a/src/static/js/geoint.js b/src/static/js/geoint.js index 35dbea2..ecfb49e 100644 --- a/src/static/js/geoint.js +++ b/src/static/js/geoint.js @@ -87,7 +87,7 @@ const GEOINT = { var self = this; var items = [ ['flights', 'Flugverkehr', 'flights'], - ['ships', 'Schiffe (Nordeuropa)', 'ships'], + ['ships', 'Schiffsverkehr', 'ships'], ['quakes', 'Erdbeben', 'quakes'], ['gdelt', 'Nachrichten', 'gdelt'], ['_sep'], @@ -250,9 +250,11 @@ const GEOINT = { count++; var color = (s.sog || 0) > 0.5 ? '#4499ff' : '#556688'; var navLabels = {0:'Motor',1:'Anker',2:'N.steuerb.',3:'Eingeschr.',5:'Festgemacht',7:'Fischfang',8:'Segel'}; - var popup = '
MMSI ' + (s.mmsi||'?') + '' + + var shipName = s.name || ('MMSI ' + (s.mmsi||'?')); + var popup = '
' + shipName + '' + + (s.name ? '
MMSI ' + (s.mmsi||'?') : '') + '
SOG ' + (s.sog||0).toFixed(1) + ' kn' + - '
NAV ' + (navLabels[s.navStat] || s.navStat) + '
'; + '
COG ' + Math.round(s.cog||0) + '\u00b0' + '
'; L.circleMarker([s.lat, s.lon], { radius: r, fillColor: color, color: '#223355', fillOpacity: 0.85, weight: 0.5, renderer: this._canvasRenderer