AegisSight Globe: Initiales Release
Eigenstaendige GEOINT-Anwendung mit CesiumJS 3D-Globus. Echtzeit-Datenlayer: Flugverkehr (airplanes.live, 64 Stuetzpunkte), Schiffsverkehr (AISStream.io WebSocket), Erdbeben (USGS), Nachrichten (GDELT GEO). FastAPI Backend, taktisches Dark-UI.
Dieser Commit ist enthalten in:
89
src/data_ships.py
Normale Datei
89
src/data_ships.py
Normale Datei
@@ -0,0 +1,89 @@
|
||||
"""Schiffsverkehr-Collector: AISStream.io WebSocket (global, Echtzeit)."""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
import websockets
|
||||
from fastapi import APIRouter
|
||||
|
||||
logger = logging.getLogger("globe.ships")
|
||||
router = APIRouter()
|
||||
|
||||
_AISSTREAM_KEY = os.getenv("AISSTREAM_KEY", "1a56b078db829727abd4d617937bae51c6f9973e")
|
||||
_AISSTREAM_URL = "wss://stream.aisstream.io/v0/stream"
|
||||
|
||||
# {mmsi: {mmsi, lat, lon, sog, cog, heading, name, ts}}
|
||||
_store: dict[int, dict] = {}
|
||||
_connected = False
|
||||
_task = None
|
||||
|
||||
|
||||
async def _listener():
|
||||
"""Dauerhafter WebSocket-Client fuer AISStream."""
|
||||
global _connected
|
||||
while True:
|
||||
try:
|
||||
logger.info("AISStream: Verbinde...")
|
||||
async with websockets.connect(
|
||||
_AISSTREAM_URL, ping_interval=30, ping_timeout=10, close_timeout=5
|
||||
) as ws:
|
||||
sub = {
|
||||
"APIKey": _AISSTREAM_KEY,
|
||||
"BoundingBoxes": [[[-90, -180], [90, 180]]],
|
||||
"FilterMessageTypes": ["PositionReport"],
|
||||
}
|
||||
await ws.send(json.dumps(sub))
|
||||
_connected = True
|
||||
logger.info("AISStream: Verbunden")
|
||||
|
||||
async for raw in ws:
|
||||
try:
|
||||
text = raw.decode("utf-8") if isinstance(raw, bytes) else raw
|
||||
msg = json.loads(text)
|
||||
meta = msg.get("MetaData", {})
|
||||
mmsi = meta.get("MMSI")
|
||||
if not mmsi:
|
||||
continue
|
||||
pos = msg.get("Message", {}).get("PositionReport", {})
|
||||
lat = meta.get("latitude") or pos.get("Latitude")
|
||||
lon = meta.get("longitude") or pos.get("Longitude")
|
||||
if not lat or not lon or not (-90 <= lat <= 90 and -180 <= lon <= 180):
|
||||
continue
|
||||
_store[mmsi] = {
|
||||
"mmsi": mmsi,
|
||||
"lat": round(lat, 5),
|
||||
"lon": round(lon, 5),
|
||||
"sog": round(pos.get("Sog", 0), 1),
|
||||
"cog": round(pos.get("Cog", 0), 1),
|
||||
"heading": pos.get("TrueHeading", 0),
|
||||
"name": (meta.get("ShipName") or "").strip(),
|
||||
"ts": time.time(),
|
||||
}
|
||||
# Stale-Cleanup alle 1000 Updates
|
||||
if len(_store) % 1000 == 0:
|
||||
cutoff = time.time() - 900
|
||||
stale = [k for k, v in _store.items() if v["ts"] < cutoff]
|
||||
for k in stale:
|
||||
del _store[k]
|
||||
if len(_store) % 5000 == 0:
|
||||
logger.info(f"AISStream: {len(_store)} Schiffe")
|
||||
except Exception:
|
||||
continue
|
||||
except Exception as e:
|
||||
_connected = False
|
||||
logger.warning(f"AISStream Fehler: {e}. Reconnect in 10s...")
|
||||
await asyncio.sleep(10)
|
||||
|
||||
|
||||
def start_ais_collector():
|
||||
global _task
|
||||
if _task is None or _task.done():
|
||||
_task = asyncio.create_task(_listener())
|
||||
logger.info("AIS collector gestartet")
|
||||
|
||||
|
||||
@router.get("/ships")
|
||||
async def get_ships():
|
||||
return {"ships": list(_store.values()), "total": len(_store), "connected": _connected}
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren