GEOINT: Globaler Schiffsverkehr via AISStream.io

Digitraffic (nur Nordeuropa) ersetzt durch AISStream.io WebSocket:
- Globale Echtzeit-AIS-Daten (tausende Schiffe weltweit)
- Dauerhafter WebSocket-Client im Backend, auto-reconnect
- Schiffsnamen im Popup (MMSI, SOG, COG)
- Binary-Frame Parsing fuer WebSocket-Nachrichten
- Auto-Start bei Server-Hochfahren
- Stale-Cleanup (>15 Min alte Positionen entfernt)
Dieser Commit ist enthalten in:
Claude Dev
2026-03-24 10:49:23 +01:00
Ursprung 381313ef12
Commit b88b305716
2 geänderte Dateien mit 106 neuen und 77 gelöschten Zeilen

Datei anzeigen

@@ -1,10 +1,12 @@
"""GEOINT-Router: Proxy fuer externe Echtzeit-Datenquellen (Flugverkehr, GDELT).""" """GEOINT-Router: Proxy fuer externe Echtzeit-Datenquellen (Flugverkehr, Schiffsverkehr, GDELT)."""
import asyncio import asyncio
import json as _json
import logging import logging
import time import time
from typing import Optional from typing import Optional
import httpx import httpx
import websockets
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query
from auth import get_current_user from auth import get_current_user
@@ -38,8 +40,6 @@ def _set_cache(key: str, data: dict):
# Flugverkehr: Globaler Snapshot (airplanes.live) # Flugverkehr: Globaler Snapshot (airplanes.live)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Stuetzpunkte fuer globale Abdeckung (je 250nm Radius ≈ 460km)
# Abdeckt: Europa, Naher Osten, Nordafrika, Nordamerika, Ostasien
_FLIGHT_GRID = [ _FLIGHT_GRID = [
# Europa # Europa
(48.0, 2.0), # Westeuropa (Paris) (48.0, 2.0), # Westeuropa (Paris)
@@ -127,7 +127,6 @@ async def _fetch_global_flights() -> dict:
return cached return cached
async with _flight_lock: async with _flight_lock:
# Doppelcheck nach Lock
cached = _get_cached("flights_global", ttl=30) cached = _get_cached("flights_global", ttl=30)
if cached: if cached:
return cached return cached
@@ -136,14 +135,10 @@ async def _fetch_global_flights() -> dict:
errors = 0 errors = 0
async with httpx.AsyncClient(timeout=10) as client: async with httpx.AsyncClient(timeout=10) as client:
# In Batches von 8 um Rate-Limits zu vermeiden
for i in range(0, len(_FLIGHT_GRID), 8): for i in range(0, len(_FLIGHT_GRID), 8):
batch = _FLIGHT_GRID[i:i + 8] batch = _FLIGHT_GRID[i:i + 8]
tasks = [] tasks = [client.get(f"https://api.airplanes.live/v2/point/{lat:.2f}/{lon:.2f}/250")
for lat, lon in batch: for lat, lon in batch]
url = f"https://api.airplanes.live/v2/point/{lat:.2f}/{lon:.2f}/250"
tasks.append(client.get(url))
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results: for r in results:
if isinstance(r, Exception): if isinstance(r, Exception):
@@ -157,95 +152,119 @@ async def _fetch_global_flights() -> dict:
seen[hex_id] = ac seen[hex_id] = ac
except Exception: except Exception:
errors += 1 errors += 1
# Kurze Pause zwischen Batches
if i + 8 < len(_FLIGHT_GRID): if i + 8 < len(_FLIGHT_GRID):
await asyncio.sleep(0.3) await asyncio.sleep(0.3)
result = {"ac": list(seen.values()), "total": len(seen), "errors": errors} result = {"ac": list(seen.values()), "total": len(seen), "errors": errors}
logger.info( logger.info(f"GEOINT Flights: {len(seen)} Flugzeuge ({errors} Fehler)")
f"GEOINT Flights: {len(seen)} Flugzeuge aus {len(_FLIGHT_GRID)} Punkten"
f" ({errors} Fehler)"
)
_set_cache("flights_global", result) _set_cache("flights_global", result)
return result return result
@router.get("/flights") @router.get("/flights")
async def get_flights( async def get_flights(_user: dict = Depends(get_current_user)):
_user: dict = Depends(get_current_user), """Globaler Flugverkehr-Snapshot. 30s Cache."""
):
"""Globaler Flugverkehr-Snapshot. 30s Cache, dedupliziert."""
return await _fetch_global_flights() return await _fetch_global_flights()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Schiffsverkehr: Digitraffic AIS (kostenlos, global, kein API-Key) # Schiffsverkehr: AISStream.io (globales Echtzeit-AIS via WebSocket)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_AISSTREAM_KEY = "1a56b078db829727abd4d617937bae51c6f9973e"
_AISSTREAM_URL = "wss://stream.aisstream.io/v0/stream"
# Globaler Schiffs-Store: {mmsi: {lat, lon, sog, cog, heading, name, ship_type, ts}}
_ships_store: dict[int, dict] = {}
_ships_lock = asyncio.Lock() _ships_lock = asyncio.Lock()
_ships_ws_task: Optional[asyncio.Task] = None
_ships_connected = False
async def _fetch_global_ships() -> dict: async def _aisstream_listener():
"""Holt globale AIS-Schiffspositionen von Digitraffic.""" """Dauerhafter WebSocket-Client fuer AISStream. Sammelt Schiffspositionen."""
cached = _get_cached("ships_global", ttl=60) global _ships_connected
if cached: while True:
return cached
async with _ships_lock:
cached = _get_cached("ships_global", ttl=60)
if cached:
return cached
url = "https://meri.digitraffic.fi/api/ais/v1/locations"
try: try:
async with httpx.AsyncClient(timeout=20) as client: logger.info("AISStream: Verbinde...")
resp = await client.get( async with websockets.connect(_AISSTREAM_URL, ping_interval=30, ping_timeout=10,
url, close_timeout=5) as ws:
headers={ # Subscription: globale BoundingBox, nur Positionsberichte
"Digitraffic-User": "AegisSight-GEOINT", sub = {
"Accept-Encoding": "gzip", "APIKey": _AISSTREAM_KEY,
}, "BoundingBoxes": [[[-90, -180], [90, 180]]],
) "FilterMessageTypes": ["PositionReport"],
resp.raise_for_status() }
data = resp.json() await ws.send(_json.dumps(sub))
_ships_connected = True
logger.info("AISStream: Verbunden, empfange Schiffsdaten...")
async for raw in ws:
try:
text = raw.decode("utf-8") if isinstance(raw, bytes) else raw
msg = _json.loads(text)
meta = msg.get("MetaData", {})
mmsi = meta.get("MMSI")
if not mmsi:
continue
pos = msg.get("Message", {}).get("PositionReport", {})
lat = meta.get("latitude") or pos.get("Latitude")
lon = meta.get("longitude") or pos.get("Longitude")
if not lat or not lon or not (-90 <= lat <= 90 and -180 <= lon <= 180):
continue
_ships_store[mmsi] = {
"mmsi": mmsi,
"lat": round(lat, 5),
"lon": round(lon, 5),
"sog": round(pos.get("Sog", 0), 1),
"cog": round(pos.get("Cog", 0), 1),
"heading": pos.get("TrueHeading", 0),
"name": (meta.get("ShipName") or "").strip(),
"ts": time.time(),
}
if len(_ships_store) % 1000 == 0:
logger.info(f"AISStream: {len(_ships_store)} Schiffe gesammelt")
# Alte Eintraege alle 60s bereinigen (>15 Min alt)
if len(_ships_store) % 500 == 0:
cutoff = time.time() - 900
stale = [k for k, v in _ships_store.items() if v["ts"] < cutoff]
for k in stale:
del _ships_store[k]
except Exception as parse_err:
if len(_ships_store) < 5:
logger.warning(f"AISStream Parse-Fehler: {parse_err}, raw type: {type(raw)}, first 100: {str(raw)[:100]}")
continue
except Exception as e: except Exception as e:
logger.warning(f"Digitraffic AIS Fehler: {e}") _ships_connected = False
return {"features": [], "total": 0} logger.warning(f"AISStream Fehler: {e}. Reconnect in 10s...")
await asyncio.sleep(10)
features = data.get("features", [])
# Nur Schiffe mit gueltigen Koordinaten und Bewegung (sog > 0.5 kn)
ships = []
for f in features:
geom = f.get("geometry")
props = f.get("properties", {})
if not geom or not geom.get("coordinates"):
continue
lon, lat = geom["coordinates"]
if not (-180 <= lon <= 180 and -90 <= lat <= 90):
continue
ships.append({
"mmsi": props.get("mmsi"),
"lat": lat,
"lon": lon,
"sog": props.get("sog", 0),
"cog": props.get("cog", 0),
"heading": props.get("heading", 0),
"navStat": props.get("navStat", 0),
})
result = {"ships": ships, "total": len(ships)} def _start_aisstream():
logger.info(f"GEOINT Ships: {len(ships)} Schiffe geladen") """Startet den AISStream-Listener als Background-Task."""
_set_cache("ships_global", result) global _ships_ws_task
return result if _ships_ws_task is None or _ships_ws_task.done():
_ships_ws_task = asyncio.create_task(_aisstream_listener())
logger.info("AISStream Background-Task gestartet")
@router.get("/ships") @router.get("/ships")
async def get_ships( async def get_ships(_user: dict = Depends(get_current_user)):
_user: dict = Depends(get_current_user), """Globaler Schiffsverkehr aus AISStream. Echtzeit-Positionen."""
): # Lazy-Start: WebSocket-Listener beim ersten Abruf starten
"""Globaler Schiffsverkehr-Snapshot. 60s Cache.""" _start_aisstream()
return await _fetch_global_ships()
ships = list(_ships_store.values())
return {
"ships": ships,
"total": len(ships),
"connected": _ships_connected,
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -278,3 +297,11 @@ async def get_gdelt(
_set_cache(cache_key, data) _set_cache(cache_key, data)
return data return data
# Auto-Start: AISStream bei erstem Request starten
@router.on_event("startup")
async def _auto_start_aisstream():
import asyncio
await asyncio.sleep(2)
_start_aisstream()

Datei anzeigen

@@ -87,7 +87,7 @@ const GEOINT = {
var self = this; var self = this;
var items = [ var items = [
['flights', 'Flugverkehr', 'flights'], ['flights', 'Flugverkehr', 'flights'],
['ships', 'Schiffe (Nordeuropa)', 'ships'], ['ships', 'Schiffsverkehr', 'ships'],
['quakes', 'Erdbeben', 'quakes'], ['quakes', 'Erdbeben', 'quakes'],
['gdelt', 'Nachrichten', 'gdelt'], ['gdelt', 'Nachrichten', 'gdelt'],
['_sep'], ['_sep'],
@@ -250,9 +250,11 @@ const GEOINT = {
count++; count++;
var color = (s.sog || 0) > 0.5 ? '#4499ff' : '#556688'; var color = (s.sog || 0) > 0.5 ? '#4499ff' : '#556688';
var navLabels = {0:'Motor',1:'Anker',2:'N.steuerb.',3:'Eingeschr.',5:'Festgemacht',7:'Fischfang',8:'Segel'}; var navLabels = {0:'Motor',1:'Anker',2:'N.steuerb.',3:'Eingeschr.',5:'Festgemacht',7:'Fischfang',8:'Segel'};
var popup = '<div class="geoint-popup"><strong>MMSI ' + (s.mmsi||'?') + '</strong>' + var shipName = s.name || ('MMSI ' + (s.mmsi||'?'));
var popup = '<div class="geoint-popup"><strong>' + shipName + '</strong>' +
(s.name ? '<br><span class="geoint-popup-key">MMSI</span> ' + (s.mmsi||'?') : '') +
'<br><span class="geoint-popup-key">SOG</span> ' + (s.sog||0).toFixed(1) + ' kn' + '<br><span class="geoint-popup-key">SOG</span> ' + (s.sog||0).toFixed(1) + ' kn' +
'<br><span class="geoint-popup-key">NAV</span> ' + (navLabels[s.navStat] || s.navStat) + '</div>'; '<br><span class="geoint-popup-key">COG</span> ' + Math.round(s.cog||0) + '\u00b0' + '</div>';
L.circleMarker([s.lat, s.lon], { L.circleMarker([s.lat, s.lon], {
radius: r, fillColor: color, color: '#223355', radius: r, fillColor: color, color: '#223355',
fillOpacity: 0.85, weight: 0.5, renderer: this._canvasRenderer fillOpacity: 0.85, weight: 0.5, renderer: this._canvasRenderer