feat: GEOINT-Toolkit mit 6 neuen Features
- EXIF-Extraktion: Automatische GPS/Kamera/Zeitstempel-Analyse bei Bildupload - Sonnenstand-Rechner: Azimut, Elevation, Schattenverhaeltnis fuer beliebige Position/Zeit - Reverse Geolocation: Erweiterte VLM-Analyse mit Landschaftsmerkmalen (Vegetation, Architektur, Strassen, Schilder) - Nachtlichter: NASA VIIRS Black Marble Layer - Hoehenprofil: Interaktives 2-Punkte-Tool mit SVG-Chart und Sichtlinienanalyse - Funkmasten: Mobilfunkinfrastruktur via Overpass (zoomabhaengig) Backend: data_geoint.py (EXIF, Sun, Elevation, Celltowers) Frontend: GEOINT Tools Section im Layer Panel
Dieser Commit ist enthalten in:
387
src/data_geoint.py
Normale Datei
387
src/data_geoint.py
Normale Datei
@@ -0,0 +1,387 @@
|
||||
"""GEOINT-Toolkit: EXIF-Extraktion, Sonnenstand, Hoehenprofil, Funkmasten."""
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
import hashlib
|
||||
import io
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, HTTPException, UploadFile, File
|
||||
from pydantic import BaseModel, Field
|
||||
from PIL import Image, ExifTags
|
||||
|
||||
logger = logging.getLogger("globe.geoint")
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 1. EXIF / Metadaten-Extraktion
|
||||
# ============================================================
|
||||
|
||||
def _dms_to_decimal(dms, ref):
|
||||
"""Konvertiert GPS DMS zu Dezimalgrad."""
|
||||
try:
|
||||
degrees = float(dms[0])
|
||||
minutes = float(dms[1])
|
||||
seconds = float(dms[2])
|
||||
decimal = degrees + minutes / 60 + seconds / 3600
|
||||
if ref in ("S", "W"):
|
||||
decimal = -decimal
|
||||
return round(decimal, 6)
|
||||
except (TypeError, IndexError, ValueError, ZeroDivisionError):
|
||||
return None
|
||||
|
||||
|
||||
def extract_exif(image_bytes: bytes) -> dict:
|
||||
"""Extrahiert EXIF-Metadaten aus Bilddaten."""
|
||||
result = {
|
||||
"has_gps": False,
|
||||
"latitude": None,
|
||||
"longitude": None,
|
||||
"altitude": None,
|
||||
"timestamp": None,
|
||||
"camera_make": None,
|
||||
"camera_model": None,
|
||||
"focal_length": None,
|
||||
"compass_heading": None,
|
||||
"image_width": None,
|
||||
"image_height": None,
|
||||
}
|
||||
|
||||
try:
|
||||
img = Image.open(io.BytesIO(image_bytes))
|
||||
result["image_width"] = img.width
|
||||
result["image_height"] = img.height
|
||||
|
||||
exif_data = img._getexif()
|
||||
if not exif_data:
|
||||
return result
|
||||
|
||||
exif = {}
|
||||
for tag_id, value in exif_data.items():
|
||||
tag = ExifTags.TAGS.get(tag_id, tag_id)
|
||||
exif[tag] = value
|
||||
|
||||
result["camera_make"] = exif.get("Make", "").strip() if isinstance(exif.get("Make"), str) else None
|
||||
result["camera_model"] = exif.get("Model", "").strip() if isinstance(exif.get("Model"), str) else None
|
||||
|
||||
fl = exif.get("FocalLength")
|
||||
if fl:
|
||||
if isinstance(fl, tuple) and len(fl) == 2:
|
||||
result["focal_length"] = round(float(fl[0]) / float(fl[1]), 1)
|
||||
elif not isinstance(fl, tuple):
|
||||
result["focal_length"] = round(float(fl), 1)
|
||||
|
||||
dt_str = exif.get("DateTimeOriginal") or exif.get("DateTime")
|
||||
if dt_str and isinstance(dt_str, str):
|
||||
try:
|
||||
result["timestamp"] = datetime.strptime(dt_str, "%Y:%m:%d %H:%M:%S").isoformat()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
gps_info = exif.get("GPSInfo")
|
||||
if gps_info and isinstance(gps_info, dict):
|
||||
gps_tags = {}
|
||||
for key, val in gps_info.items():
|
||||
tag = ExifTags.GPSTAGS.get(key, key)
|
||||
gps_tags[tag] = val
|
||||
|
||||
lat_dms = gps_tags.get("GPSLatitude")
|
||||
lat_ref = gps_tags.get("GPSLatitudeRef", "N")
|
||||
lon_dms = gps_tags.get("GPSLongitude")
|
||||
lon_ref = gps_tags.get("GPSLongitudeRef", "E")
|
||||
|
||||
if lat_dms and lon_dms:
|
||||
lat = _dms_to_decimal(lat_dms, lat_ref)
|
||||
lon = _dms_to_decimal(lon_dms, lon_ref)
|
||||
if lat is not None and lon is not None:
|
||||
result["has_gps"] = True
|
||||
result["latitude"] = lat
|
||||
result["longitude"] = lon
|
||||
|
||||
alt = gps_tags.get("GPSAltitude")
|
||||
if alt:
|
||||
try:
|
||||
result["altitude"] = round(float(alt), 1)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
heading = gps_tags.get("GPSImgDirection")
|
||||
if heading:
|
||||
try:
|
||||
result["compass_heading"] = round(float(heading), 1)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"EXIF-Extraktion fehlgeschlagen: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/geoint/exif")
|
||||
async def api_extract_exif(file: UploadFile = File(...)):
|
||||
"""Extrahiert EXIF/GPS-Metadaten aus einem Bild."""
|
||||
content = await file.read()
|
||||
if len(content) > 15 * 1024 * 1024:
|
||||
raise HTTPException(400, "Datei zu gross (max 15MB)")
|
||||
return extract_exif(content)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 2. Sonnenstand-Berechnung (Jean Meeus, vereinfacht)
|
||||
# ============================================================
|
||||
|
||||
class SunRequest(BaseModel):
|
||||
latitude: float = Field(..., ge=-90, le=90)
|
||||
longitude: float = Field(..., ge=-180, le=180)
|
||||
datetime_iso: str
|
||||
|
||||
|
||||
def _calc_sun_position(lat, lon, dt):
|
||||
"""Berechnet Sonnenazimut und -elevation."""
|
||||
y, m = dt.year, dt.month
|
||||
d = dt.day + dt.hour / 24 + dt.minute / 1440 + dt.second / 86400
|
||||
if m <= 2:
|
||||
y -= 1
|
||||
m += 12
|
||||
A = int(y / 100)
|
||||
B = 2 - A + int(A / 4)
|
||||
JD = int(365.25 * (y + 4716)) + int(30.6001 * (m + 1)) + d + B - 1524.5
|
||||
T = (JD - 2451545.0) / 36525.0
|
||||
|
||||
L0 = (280.46646 + T * (36000.76983 + 0.0003032 * T)) % 360
|
||||
M = (357.52911 + T * (35999.05029 - 0.0001537 * T)) % 360
|
||||
M_rad = math.radians(M)
|
||||
|
||||
C = ((1.914602 - T * (0.004817 + 0.000014 * T)) * math.sin(M_rad) +
|
||||
(0.019993 - 0.000101 * T) * math.sin(2 * M_rad) +
|
||||
0.000289 * math.sin(3 * M_rad))
|
||||
|
||||
sun_lon = L0 + C
|
||||
obliquity = 23.439291 - 0.0130042 * T
|
||||
obliquity_rad = math.radians(obliquity)
|
||||
sun_lon_rad = math.radians(sun_lon)
|
||||
|
||||
RA = math.atan2(math.cos(obliquity_rad) * math.sin(sun_lon_rad), math.cos(sun_lon_rad))
|
||||
dec = math.asin(math.sin(obliquity_rad) * math.sin(sun_lon_rad))
|
||||
|
||||
GMST = (280.46061837 + 360.98564736629 * (JD - 2451545.0) + 0.000387933 * T * T) % 360
|
||||
LST = math.radians(GMST + lon)
|
||||
HA = LST - RA
|
||||
|
||||
lat_rad = math.radians(lat)
|
||||
sin_alt = math.sin(lat_rad) * math.sin(dec) + math.cos(lat_rad) * math.cos(dec) * math.cos(HA)
|
||||
altitude = math.degrees(math.asin(max(-1, min(1, sin_alt))))
|
||||
|
||||
cos_alt = math.cos(math.asin(max(-1, min(1, sin_alt))))
|
||||
if cos_alt == 0:
|
||||
azimuth = 0
|
||||
else:
|
||||
cos_az = (math.sin(dec) - math.sin(lat_rad) * sin_alt) / (math.cos(lat_rad) * cos_alt)
|
||||
cos_az = max(-1, min(1, cos_az))
|
||||
azimuth = math.degrees(math.acos(cos_az))
|
||||
if math.sin(HA) > 0:
|
||||
azimuth = 360 - azimuth
|
||||
|
||||
shadow_ratio = None
|
||||
if altitude > 0:
|
||||
shadow_ratio = round(1.0 / math.tan(math.radians(altitude)), 2)
|
||||
|
||||
return {
|
||||
"azimuth": round(azimuth, 2),
|
||||
"elevation": round(altitude, 2),
|
||||
"shadow_ratio": shadow_ratio,
|
||||
"shadow_direction": round((azimuth + 180) % 360, 2),
|
||||
"is_daylight": altitude > -0.833,
|
||||
"golden_hour": 0 < altitude < 10,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/geoint/sun-position")
|
||||
async def api_sun_position(req: SunRequest):
|
||||
"""Berechnet den Sonnenstand fuer eine Position und Zeit."""
|
||||
try:
|
||||
dt = datetime.fromisoformat(req.datetime_iso.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
raise HTTPException(400, "Ungueltiges Datumsformat (ISO 8601)")
|
||||
|
||||
result = _calc_sun_position(req.latitude, req.longitude, dt)
|
||||
result["input"] = {"latitude": req.latitude, "longitude": req.longitude, "datetime": dt.isoformat()}
|
||||
return result
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 3. Hoehenprofil + Sichtlinienanalyse
|
||||
# ============================================================
|
||||
|
||||
class ElevationRequest(BaseModel):
|
||||
start_lat: float = Field(..., ge=-90, le=90)
|
||||
start_lon: float = Field(..., ge=-180, le=180)
|
||||
end_lat: float = Field(..., ge=-90, le=90)
|
||||
end_lon: float = Field(..., ge=-180, le=180)
|
||||
samples: int = Field(50, ge=10, le=100)
|
||||
|
||||
|
||||
def _haversine(lat1, lon1, lat2, lon2):
|
||||
R = 6371000
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||
|
||||
|
||||
def _check_line_of_sight(elevations):
|
||||
if len(elevations) < 3:
|
||||
return {"clear": True, "blocked_at_index": None}
|
||||
start_h = elevations[0]
|
||||
end_h = elevations[-1]
|
||||
n = len(elevations) - 1
|
||||
for i in range(1, n):
|
||||
t = i / n
|
||||
expected = start_h + t * (end_h - start_h)
|
||||
if elevations[i] > expected + 2:
|
||||
return {"clear": False, "blocked_at_index": i}
|
||||
return {"clear": True, "blocked_at_index": None}
|
||||
|
||||
|
||||
@router.post("/geoint/elevation-profile")
|
||||
async def api_elevation_profile(req: ElevationRequest):
|
||||
"""Berechnet ein Hoehenprofil zwischen zwei Punkten."""
|
||||
lats, lons = [], []
|
||||
for i in range(req.samples):
|
||||
t = i / (req.samples - 1)
|
||||
lats.append(round(req.start_lat + t * (req.end_lat - req.start_lat), 6))
|
||||
lons.append(round(req.start_lon + t * (req.end_lon - req.start_lon), 6))
|
||||
|
||||
lat_str = ",".join(str(l) for l in lats)
|
||||
lon_str = ",".join(str(l) for l in lons)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
r = await client.get(
|
||||
"https://api.open-meteo.com/v1/elevation",
|
||||
params={"latitude": lat_str, "longitude": lon_str},
|
||||
)
|
||||
r.raise_for_status()
|
||||
data = r.json()
|
||||
except Exception as e:
|
||||
logger.error(f"Elevation API Fehler: {e}")
|
||||
raise HTTPException(502, "Elevation API nicht erreichbar")
|
||||
|
||||
elevations = data.get("elevation", [])
|
||||
if not elevations:
|
||||
raise HTTPException(502, "Keine Hoehendaten erhalten")
|
||||
|
||||
distances = [0.0]
|
||||
total_dist = 0.0
|
||||
for i in range(1, len(lats)):
|
||||
d = _haversine(lats[i - 1], lons[i - 1], lats[i], lons[i])
|
||||
total_dist += d
|
||||
distances.append(round(total_dist, 1))
|
||||
|
||||
elev_min = min(elevations)
|
||||
elev_max = max(elevations)
|
||||
total_ascent = sum(max(0, elevations[i] - elevations[i - 1]) for i in range(1, len(elevations)))
|
||||
total_descent = sum(max(0, elevations[i - 1] - elevations[i]) for i in range(1, len(elevations)))
|
||||
los = _check_line_of_sight(elevations)
|
||||
|
||||
points = []
|
||||
for i in range(len(lats)):
|
||||
points.append({
|
||||
"lat": lats[i], "lon": lons[i],
|
||||
"elevation": elevations[i] if i < len(elevations) else 0,
|
||||
"distance_m": distances[i],
|
||||
})
|
||||
|
||||
return {
|
||||
"points": points,
|
||||
"stats": {
|
||||
"distance_m": round(total_dist, 1),
|
||||
"distance_km": round(total_dist / 1000, 2),
|
||||
"elevation_min": elev_min,
|
||||
"elevation_max": elev_max,
|
||||
"elevation_diff": round(elev_max - elev_min, 1),
|
||||
"total_ascent": round(total_ascent, 1),
|
||||
"total_descent": round(total_descent, 1),
|
||||
},
|
||||
"line_of_sight": los,
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 4. Funkmasten (via Overpass API)
|
||||
# ============================================================
|
||||
|
||||
class CelltowerRequest(BaseModel):
|
||||
south: float = Field(..., ge=-90, le=90)
|
||||
west: float = Field(..., ge=-180, le=180)
|
||||
north: float = Field(..., ge=-90, le=90)
|
||||
east: float = Field(..., ge=-180, le=180)
|
||||
|
||||
|
||||
_CT_CACHE = {}
|
||||
_CT_CACHE_TTL = 600
|
||||
|
||||
|
||||
@router.post("/geoint/celltowers")
|
||||
async def api_celltowers(req: CelltowerRequest):
|
||||
"""Liefert Funkmasten im Viewport via Overpass API."""
|
||||
from data_overpass import _OVERPASS_URLS, _TIMEOUT
|
||||
|
||||
bbox = f"{req.south},{req.west},{req.north},{req.east}"
|
||||
cache_key = hashlib.md5(bbox.encode()).hexdigest()
|
||||
|
||||
if cache_key in _CT_CACHE:
|
||||
ts, cached = _CT_CACHE[cache_key]
|
||||
if time.time() - ts < _CT_CACHE_TTL:
|
||||
return cached
|
||||
|
||||
query = (
|
||||
"[out:json][timeout:25];"
|
||||
"("
|
||||
f'node["man_made"="mast"]["tower:type"="communication"]({bbox});'
|
||||
f'node["man_made"="tower"]["tower:type"="communication"]({bbox});'
|
||||
f'node["telecom"="antenna"]({bbox});'
|
||||
f'node["communication:mobile_phone"="yes"]({bbox});'
|
||||
");"
|
||||
"out body;"
|
||||
)
|
||||
|
||||
data = None
|
||||
for url in _OVERPASS_URLS:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_TIMEOUT) as client:
|
||||
r = await client.post(url, data={"data": query})
|
||||
r.raise_for_status()
|
||||
data = r.json()
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"Celltower Overpass {url}: {e}")
|
||||
continue
|
||||
|
||||
if data is None:
|
||||
raise HTTPException(502, "Overpass API nicht erreichbar")
|
||||
|
||||
towers = []
|
||||
for el in data.get("elements", []):
|
||||
if not el.get("lat") or not el.get("lon"):
|
||||
continue
|
||||
tags = el.get("tags", {})
|
||||
towers.append({
|
||||
"lat": el["lat"],
|
||||
"lon": el["lon"],
|
||||
"operator": tags.get("operator", tags.get("communication:mobile_phone:operator", "")),
|
||||
"ref": tags.get("ref", ""),
|
||||
"height": tags.get("height", ""),
|
||||
"name": tags.get("name", ""),
|
||||
})
|
||||
|
||||
result = {"towers": towers, "total": len(towers)}
|
||||
_CT_CACHE[cache_key] = (time.time(), result)
|
||||
logger.info(f"Funkmasten: {len(towers)} im Viewport")
|
||||
return result
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren