Dateien
AegisSight-Globe/src/data_geoint.py
Claude Dev c7cb19d584 feat: GEOINT-Toolkit mit 6 neuen Features
- EXIF-Extraktion: Automatische GPS/Kamera/Zeitstempel-Analyse bei Bildupload
- Sonnenstand-Rechner: Azimut, Elevation, Schattenverhaeltnis fuer beliebige Position/Zeit
- Reverse Geolocation: Erweiterte VLM-Analyse mit Landschaftsmerkmalen (Vegetation, Architektur, Strassen, Schilder)
- Nachtlichter: NASA VIIRS Black Marble Layer
- Hoehenprofil: Interaktives 2-Punkte-Tool mit SVG-Chart und Sichtlinienanalyse
- Funkmasten: Mobilfunkinfrastruktur via Overpass (zoomabhaengig)

Backend: data_geoint.py (EXIF, Sun, Elevation, Celltowers)
Frontend: GEOINT Tools Section im Layer Panel
2026-03-26 08:58:05 +01:00

388 Zeilen
13 KiB
Python

"""GEOINT-Toolkit: EXIF-Extraktion, Sonnenstand, Hoehenprofil, Funkmasten."""
import logging
import math
import time
import hashlib
import io
from datetime import datetime, timezone
import httpx
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel, Field
from PIL import Image, ExifTags
logger = logging.getLogger("globe.geoint")
router = APIRouter()
# ============================================================
# 1. EXIF / Metadaten-Extraktion
# ============================================================
def _dms_to_decimal(dms, ref):
"""Konvertiert GPS DMS zu Dezimalgrad."""
try:
degrees = float(dms[0])
minutes = float(dms[1])
seconds = float(dms[2])
decimal = degrees + minutes / 60 + seconds / 3600
if ref in ("S", "W"):
decimal = -decimal
return round(decimal, 6)
except (TypeError, IndexError, ValueError, ZeroDivisionError):
return None
def extract_exif(image_bytes: bytes) -> dict:
"""Extrahiert EXIF-Metadaten aus Bilddaten."""
result = {
"has_gps": False,
"latitude": None,
"longitude": None,
"altitude": None,
"timestamp": None,
"camera_make": None,
"camera_model": None,
"focal_length": None,
"compass_heading": None,
"image_width": None,
"image_height": None,
}
try:
img = Image.open(io.BytesIO(image_bytes))
result["image_width"] = img.width
result["image_height"] = img.height
exif_data = img._getexif()
if not exif_data:
return result
exif = {}
for tag_id, value in exif_data.items():
tag = ExifTags.TAGS.get(tag_id, tag_id)
exif[tag] = value
result["camera_make"] = exif.get("Make", "").strip() if isinstance(exif.get("Make"), str) else None
result["camera_model"] = exif.get("Model", "").strip() if isinstance(exif.get("Model"), str) else None
fl = exif.get("FocalLength")
if fl:
if isinstance(fl, tuple) and len(fl) == 2:
result["focal_length"] = round(float(fl[0]) / float(fl[1]), 1)
elif not isinstance(fl, tuple):
result["focal_length"] = round(float(fl), 1)
dt_str = exif.get("DateTimeOriginal") or exif.get("DateTime")
if dt_str and isinstance(dt_str, str):
try:
result["timestamp"] = datetime.strptime(dt_str, "%Y:%m:%d %H:%M:%S").isoformat()
except ValueError:
pass
gps_info = exif.get("GPSInfo")
if gps_info and isinstance(gps_info, dict):
gps_tags = {}
for key, val in gps_info.items():
tag = ExifTags.GPSTAGS.get(key, key)
gps_tags[tag] = val
lat_dms = gps_tags.get("GPSLatitude")
lat_ref = gps_tags.get("GPSLatitudeRef", "N")
lon_dms = gps_tags.get("GPSLongitude")
lon_ref = gps_tags.get("GPSLongitudeRef", "E")
if lat_dms and lon_dms:
lat = _dms_to_decimal(lat_dms, lat_ref)
lon = _dms_to_decimal(lon_dms, lon_ref)
if lat is not None and lon is not None:
result["has_gps"] = True
result["latitude"] = lat
result["longitude"] = lon
alt = gps_tags.get("GPSAltitude")
if alt:
try:
result["altitude"] = round(float(alt), 1)
except (TypeError, ValueError):
pass
heading = gps_tags.get("GPSImgDirection")
if heading:
try:
result["compass_heading"] = round(float(heading), 1)
except (TypeError, ValueError):
pass
except Exception as e:
logger.warning(f"EXIF-Extraktion fehlgeschlagen: {e}")
return result
@router.post("/geoint/exif")
async def api_extract_exif(file: UploadFile = File(...)):
"""Extrahiert EXIF/GPS-Metadaten aus einem Bild."""
content = await file.read()
if len(content) > 15 * 1024 * 1024:
raise HTTPException(400, "Datei zu gross (max 15MB)")
return extract_exif(content)
# ============================================================
# 2. Sonnenstand-Berechnung (Jean Meeus, vereinfacht)
# ============================================================
class SunRequest(BaseModel):
latitude: float = Field(..., ge=-90, le=90)
longitude: float = Field(..., ge=-180, le=180)
datetime_iso: str
def _calc_sun_position(lat, lon, dt):
"""Berechnet Sonnenazimut und -elevation."""
y, m = dt.year, dt.month
d = dt.day + dt.hour / 24 + dt.minute / 1440 + dt.second / 86400
if m <= 2:
y -= 1
m += 12
A = int(y / 100)
B = 2 - A + int(A / 4)
JD = int(365.25 * (y + 4716)) + int(30.6001 * (m + 1)) + d + B - 1524.5
T = (JD - 2451545.0) / 36525.0
L0 = (280.46646 + T * (36000.76983 + 0.0003032 * T)) % 360
M = (357.52911 + T * (35999.05029 - 0.0001537 * T)) % 360
M_rad = math.radians(M)
C = ((1.914602 - T * (0.004817 + 0.000014 * T)) * math.sin(M_rad) +
(0.019993 - 0.000101 * T) * math.sin(2 * M_rad) +
0.000289 * math.sin(3 * M_rad))
sun_lon = L0 + C
obliquity = 23.439291 - 0.0130042 * T
obliquity_rad = math.radians(obliquity)
sun_lon_rad = math.radians(sun_lon)
RA = math.atan2(math.cos(obliquity_rad) * math.sin(sun_lon_rad), math.cos(sun_lon_rad))
dec = math.asin(math.sin(obliquity_rad) * math.sin(sun_lon_rad))
GMST = (280.46061837 + 360.98564736629 * (JD - 2451545.0) + 0.000387933 * T * T) % 360
LST = math.radians(GMST + lon)
HA = LST - RA
lat_rad = math.radians(lat)
sin_alt = math.sin(lat_rad) * math.sin(dec) + math.cos(lat_rad) * math.cos(dec) * math.cos(HA)
altitude = math.degrees(math.asin(max(-1, min(1, sin_alt))))
cos_alt = math.cos(math.asin(max(-1, min(1, sin_alt))))
if cos_alt == 0:
azimuth = 0
else:
cos_az = (math.sin(dec) - math.sin(lat_rad) * sin_alt) / (math.cos(lat_rad) * cos_alt)
cos_az = max(-1, min(1, cos_az))
azimuth = math.degrees(math.acos(cos_az))
if math.sin(HA) > 0:
azimuth = 360 - azimuth
shadow_ratio = None
if altitude > 0:
shadow_ratio = round(1.0 / math.tan(math.radians(altitude)), 2)
return {
"azimuth": round(azimuth, 2),
"elevation": round(altitude, 2),
"shadow_ratio": shadow_ratio,
"shadow_direction": round((azimuth + 180) % 360, 2),
"is_daylight": altitude > -0.833,
"golden_hour": 0 < altitude < 10,
}
@router.post("/geoint/sun-position")
async def api_sun_position(req: SunRequest):
"""Berechnet den Sonnenstand fuer eine Position und Zeit."""
try:
dt = datetime.fromisoformat(req.datetime_iso.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
except ValueError:
raise HTTPException(400, "Ungueltiges Datumsformat (ISO 8601)")
result = _calc_sun_position(req.latitude, req.longitude, dt)
result["input"] = {"latitude": req.latitude, "longitude": req.longitude, "datetime": dt.isoformat()}
return result
# ============================================================
# 3. Hoehenprofil + Sichtlinienanalyse
# ============================================================
class ElevationRequest(BaseModel):
start_lat: float = Field(..., ge=-90, le=90)
start_lon: float = Field(..., ge=-180, le=180)
end_lat: float = Field(..., ge=-90, le=90)
end_lon: float = Field(..., ge=-180, le=180)
samples: int = Field(50, ge=10, le=100)
def _haversine(lat1, lon1, lat2, lon2):
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _check_line_of_sight(elevations):
if len(elevations) < 3:
return {"clear": True, "blocked_at_index": None}
start_h = elevations[0]
end_h = elevations[-1]
n = len(elevations) - 1
for i in range(1, n):
t = i / n
expected = start_h + t * (end_h - start_h)
if elevations[i] > expected + 2:
return {"clear": False, "blocked_at_index": i}
return {"clear": True, "blocked_at_index": None}
@router.post("/geoint/elevation-profile")
async def api_elevation_profile(req: ElevationRequest):
"""Berechnet ein Hoehenprofil zwischen zwei Punkten."""
lats, lons = [], []
for i in range(req.samples):
t = i / (req.samples - 1)
lats.append(round(req.start_lat + t * (req.end_lat - req.start_lat), 6))
lons.append(round(req.start_lon + t * (req.end_lon - req.start_lon), 6))
lat_str = ",".join(str(l) for l in lats)
lon_str = ",".join(str(l) for l in lons)
try:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(
"https://api.open-meteo.com/v1/elevation",
params={"latitude": lat_str, "longitude": lon_str},
)
r.raise_for_status()
data = r.json()
except Exception as e:
logger.error(f"Elevation API Fehler: {e}")
raise HTTPException(502, "Elevation API nicht erreichbar")
elevations = data.get("elevation", [])
if not elevations:
raise HTTPException(502, "Keine Hoehendaten erhalten")
distances = [0.0]
total_dist = 0.0
for i in range(1, len(lats)):
d = _haversine(lats[i - 1], lons[i - 1], lats[i], lons[i])
total_dist += d
distances.append(round(total_dist, 1))
elev_min = min(elevations)
elev_max = max(elevations)
total_ascent = sum(max(0, elevations[i] - elevations[i - 1]) for i in range(1, len(elevations)))
total_descent = sum(max(0, elevations[i - 1] - elevations[i]) for i in range(1, len(elevations)))
los = _check_line_of_sight(elevations)
points = []
for i in range(len(lats)):
points.append({
"lat": lats[i], "lon": lons[i],
"elevation": elevations[i] if i < len(elevations) else 0,
"distance_m": distances[i],
})
return {
"points": points,
"stats": {
"distance_m": round(total_dist, 1),
"distance_km": round(total_dist / 1000, 2),
"elevation_min": elev_min,
"elevation_max": elev_max,
"elevation_diff": round(elev_max - elev_min, 1),
"total_ascent": round(total_ascent, 1),
"total_descent": round(total_descent, 1),
},
"line_of_sight": los,
}
# ============================================================
# 4. Funkmasten (via Overpass API)
# ============================================================
class CelltowerRequest(BaseModel):
south: float = Field(..., ge=-90, le=90)
west: float = Field(..., ge=-180, le=180)
north: float = Field(..., ge=-90, le=90)
east: float = Field(..., ge=-180, le=180)
_CT_CACHE = {}
_CT_CACHE_TTL = 600
@router.post("/geoint/celltowers")
async def api_celltowers(req: CelltowerRequest):
"""Liefert Funkmasten im Viewport via Overpass API."""
from data_overpass import _OVERPASS_URLS, _TIMEOUT
bbox = f"{req.south},{req.west},{req.north},{req.east}"
cache_key = hashlib.md5(bbox.encode()).hexdigest()
if cache_key in _CT_CACHE:
ts, cached = _CT_CACHE[cache_key]
if time.time() - ts < _CT_CACHE_TTL:
return cached
query = (
"[out:json][timeout:25];"
"("
f'node["man_made"="mast"]["tower:type"="communication"]({bbox});'
f'node["man_made"="tower"]["tower:type"="communication"]({bbox});'
f'node["telecom"="antenna"]({bbox});'
f'node["communication:mobile_phone"="yes"]({bbox});'
");"
"out body;"
)
data = None
for url in _OVERPASS_URLS:
try:
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
r = await client.post(url, data={"data": query})
r.raise_for_status()
data = r.json()
break
except Exception as e:
logger.warning(f"Celltower Overpass {url}: {e}")
continue
if data is None:
raise HTTPException(502, "Overpass API nicht erreichbar")
towers = []
for el in data.get("elements", []):
if not el.get("lat") or not el.get("lon"):
continue
tags = el.get("tags", {})
towers.append({
"lat": el["lat"],
"lon": el["lon"],
"operator": tags.get("operator", tags.get("communication:mobile_phone:operator", "")),
"ref": tags.get("ref", ""),
"height": tags.get("height", ""),
"name": tags.get("name", ""),
})
result = {"towers": towers, "total": len(towers)}
_CT_CACHE[cache_key] = (time.time(), result)
logger.info(f"Funkmasten: {len(towers)} im Viewport")
return result