Dateien
Hetzner-Backup/v2_adminpanel/routes/session_routes.py
2025-07-03 20:38:33 +00:00

473 Zeilen
17 KiB
Python

import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from flask import Blueprint, render_template, request, redirect, session, url_for, flash
import config
from auth.decorators import login_required
from utils.audit import log_audit
from utils.network import get_client_ip
from db import get_connection, get_db_connection, get_db_cursor
from db_license import get_license_db_cursor
from models import get_active_sessions
# Create Blueprint
session_bp = Blueprint('sessions', __name__)
@session_bp.route("/sessions")
@login_required
def sessions():
# Use regular DB for customer/license info
conn = get_connection()
cur = conn.cursor()
try:
# First get license mapping from admin DB
cur.execute("SELECT id, license_key FROM licenses")
license_map = {row[0]: row[1] for row in cur.fetchall()}
# Get customer mapping
cur.execute("SELECT l.id, c.name FROM licenses l JOIN customers c ON l.customer_id = c.id")
customer_map = {row[0]: row[1] for row in cur.fetchall()}
cur.close()
conn.close()
# Now get sessions from license server DB
with get_license_db_cursor() as license_cur:
# Get active sessions
license_cur.execute("""
SELECT id, license_id, session_token, ip_address, client_version,
started_at, last_heartbeat, hardware_id,
EXTRACT(EPOCH FROM (NOW() - last_heartbeat))/60 as minutes_inactive
FROM license_sessions
WHERE ended_at IS NULL
ORDER BY last_heartbeat DESC
""")
active_sessions = []
for row in license_cur.fetchall():
active_sessions.append((
row[0], # id
row[2], # session_token
license_map.get(row[1], 'Unknown'), # license_key
customer_map.get(row[1], 'Unknown'), # customer name
row[3], # ip_address
row[4], # client_version
row[5], # started_at
row[6], # last_heartbeat
row[8] # minutes_inactive
))
# Get recent ended sessions
license_cur.execute("""
SELECT id, license_id, session_token, ip_address,
started_at, ended_at,
EXTRACT(EPOCH FROM (ended_at - started_at))/60 as duration_minutes
FROM license_sessions
WHERE ended_at IS NOT NULL
AND ended_at > NOW() - INTERVAL '24 hours'
ORDER BY ended_at DESC
LIMIT 50
""")
recent_sessions = []
for row in license_cur.fetchall():
recent_sessions.append((
row[0], # id
row[2], # session_token
license_map.get(row[1], 'Unknown'), # license_key
customer_map.get(row[1], 'Unknown'), # customer name
row[3], # ip_address
row[4], # started_at
row[5], # ended_at
row[6] # duration_minutes
))
return render_template("sessions.html",
active_sessions=active_sessions,
recent_sessions=recent_sessions)
except Exception as e:
logging.error(f"Error loading sessions: {str(e)}")
flash('Fehler beim Laden der Sessions!', 'error')
return redirect(url_for('admin.dashboard'))
@session_bp.route("/sessions/history")
@login_required
def session_history():
"""Zeigt die Session-Historie"""
conn = get_connection()
cur = conn.cursor()
try:
# Query parameters
license_key = request.args.get('license_key', '')
username = request.args.get('username', '')
days = int(request.args.get('days', 7))
# Base query
query = """
SELECT
ls.id,
l.license_key,
ls.machine_name as username,
ls.hardware_id,
ls.started_at,
ls.ended_at,
ls.last_heartbeat,
CASE WHEN ls.ended_at IS NULL THEN true ELSE false END as is_active,
c.name as customer_name,
l.license_type,
l.is_test
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
LEFT JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
# Apply filters
if license_key:
query += " AND l.license_key = %s"
params.append(license_key)
if username:
query += " AND ls.machine_name ILIKE %s"
params.append(f'%{username}%')
# Time filter
query += " AND ls.started_at >= CURRENT_TIMESTAMP - INTERVAL '%s days'"
params.append(days)
query += " ORDER BY ls.started_at DESC LIMIT 1000"
cur.execute(query, params)
sessions_list = []
for row in cur.fetchall():
session_duration = None
if row[4] and row[5]: # started_at and ended_at
duration = row[5] - row[4]
hours = int(duration.total_seconds() // 3600)
minutes = int((duration.total_seconds() % 3600) // 60)
session_duration = f"{hours}h {minutes}m"
elif row[4] and row[7]: # started_at and is_active
duration = datetime.now(ZoneInfo("UTC")) - row[4]
hours = int(duration.total_seconds() // 3600)
minutes = int((duration.total_seconds() % 3600) // 60)
session_duration = f"{hours}h {minutes}m (aktiv)"
sessions_list.append({
'id': row[0],
'license_key': row[1],
'username': row[2],
'hardware_id': row[3],
'started_at': row[4],
'ended_at': row[5],
'last_heartbeat': row[6],
'is_active': row[7],
'customer_name': row[8],
'license_type': row[9],
'is_test': row[10],
'duration': session_duration
})
# Get unique license keys for filter dropdown
cur.execute("""
SELECT DISTINCT l.license_key, c.name as customer_name
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
LEFT JOIN customers c ON l.customer_id = c.id
WHERE ls.started_at >= CURRENT_TIMESTAMP - INTERVAL '30 days'
ORDER BY c.name, l.license_key
""")
available_licenses = []
for row in cur.fetchall():
available_licenses.append({
'license_key': row[0],
'customer_name': row[1] or 'Unbekannt'
})
return render_template("session_history.html",
sessions=sessions_list,
available_licenses=available_licenses,
filters={
'license_key': license_key,
'username': username,
'days': days
})
except Exception as e:
logging.error(f"Fehler beim Laden der Session-Historie: {str(e)}")
flash('Fehler beim Laden der Session-Historie!', 'error')
return redirect(url_for('sessions.sessions'))
finally:
cur.close()
conn.close()
@session_bp.route("/session/end/<int:session_id>", methods=["POST"])
@login_required
def terminate_session(session_id):
"""Beendet eine aktive Session"""
try:
session_info = None
# Get session info from license server DB
with get_license_db_cursor() as license_cur:
license_cur.execute("""
SELECT license_id, hardware_id, machine_name
FROM license_sessions
WHERE id = %s AND ended_at IS NULL
""", (session_id,))
result = license_cur.fetchone()
if not result:
flash('Session nicht gefunden oder bereits beendet!', 'error')
return redirect(url_for('sessions.sessions'))
license_id = result[0]
# Terminate session in license server DB
license_cur.execute("""
UPDATE license_sessions
SET ended_at = CURRENT_TIMESTAMP, end_reason = 'admin_terminated'
WHERE id = %s
""", (session_id,))
# Get license key from admin DB for audit log
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT license_key FROM licenses WHERE id = %s", (license_id,))
license_key = cur.fetchone()[0] if cur.fetchone() else 'Unknown'
cur.close()
conn.close()
# Audit log
log_audit('SESSION_TERMINATE', 'session', session_id,
additional_info=f"Session beendet für Lizenz {license_key}")
flash('Session erfolgreich beendet!', 'success')
except Exception as e:
logging.error(f"Fehler beim Beenden der Session: {str(e)}")
flash('Fehler beim Beenden der Session!', 'error')
return redirect(url_for('sessions.sessions'))
@session_bp.route("/sessions/terminate-all/<license_key>", methods=["POST"])
@login_required
def terminate_all_sessions(license_key):
"""Beendet alle aktiven Sessions einer Lizenz"""
conn = get_connection()
cur = conn.cursor()
try:
# Count active sessions
cur.execute("""
SELECT COUNT(*) FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
WHERE l.license_key = %s AND ls.ended_at IS NULL
""", (license_key,))
active_count = cur.fetchone()[0]
if active_count == 0:
flash('Keine aktiven Sessions gefunden!', 'info')
return redirect(url_for('sessions.sessions'))
# Terminate all sessions
cur.execute("""
UPDATE license_sessions
SET ended_at = CURRENT_TIMESTAMP, end_reason = 'admin_terminated_all'
WHERE license_id IN (
SELECT id FROM licenses WHERE license_key = %s
) AND ended_at IS NULL
""", (license_key,))
conn.commit()
# Audit log
log_audit('SESSION_TERMINATE_ALL', 'license', None,
additional_info=f"{active_count} Sessions beendet für Lizenz {license_key}")
flash(f'{active_count} Sessions erfolgreich beendet!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Beenden der Sessions: {str(e)}")
flash('Fehler beim Beenden der Sessions!', 'error')
finally:
cur.close()
conn.close()
return redirect(url_for('sessions.sessions'))
@session_bp.route("/sessions/cleanup", methods=["POST"])
@login_required
def cleanup_sessions():
"""Bereinigt alte inaktive Sessions"""
conn = get_connection()
cur = conn.cursor()
try:
days = int(request.form.get('days', 30))
# Delete old inactive sessions
cur.execute("""
DELETE FROM license_sessions
WHERE ended_at IS NOT NULL
AND ended_at < CURRENT_TIMESTAMP - INTERVAL '%s days'
RETURNING id
""", (days,))
deleted_ids = [row[0] for row in cur.fetchall()]
deleted_count = len(deleted_ids)
conn.commit()
# Audit log
if deleted_count > 0:
log_audit('SESSION_CLEANUP', 'system', None,
additional_info=f"{deleted_count} Sessions älter als {days} Tage gelöscht")
flash(f'{deleted_count} alte Sessions bereinigt!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Bereinigen der Sessions: {str(e)}")
flash('Fehler beim Bereinigen der Sessions!', 'error')
finally:
cur.close()
conn.close()
return redirect(url_for('sessions.session_history'))
@session_bp.route("/sessions/statistics")
@login_required
def session_statistics():
"""Zeigt Session-Statistiken"""
conn = get_connection()
cur = conn.cursor()
try:
# Aktuelle Statistiken
cur.execute("""
SELECT
COUNT(DISTINCT l.license_key) as active_licenses,
COUNT(DISTINCT ls.machine_name) as unique_users,
COUNT(DISTINCT ls.hardware_id) as unique_devices,
COUNT(*) as total_active_sessions
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
WHERE ls.ended_at IS NULL
""")
current_stats = cur.fetchone()
# Sessions nach Lizenztyp
cur.execute("""
SELECT
l.license_type,
COUNT(*) as session_count
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
WHERE ls.ended_at IS NULL
GROUP BY l.license_type
ORDER BY session_count DESC
""")
sessions_by_type = []
for row in cur.fetchall():
sessions_by_type.append({
'license_type': row[0],
'count': row[1]
})
# Top 10 Lizenzen nach aktiven Sessions
cur.execute("""
SELECT
l.license_key,
c.name as customer_name,
COUNT(*) as session_count,
l.device_limit
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE ls.ended_at IS NULL
GROUP BY l.license_key, c.name, l.device_limit
ORDER BY session_count DESC
LIMIT 10
""")
top_licenses = []
for row in cur.fetchall():
top_licenses.append({
'license_key': row[0],
'customer_name': row[1],
'session_count': row[2],
'device_limit': row[3]
})
# Session-Verlauf (letzte 7 Tage)
cur.execute("""
SELECT
DATE(ls.started_at) as date,
COUNT(*) as login_count,
COUNT(DISTINCT l.license_key) as unique_licenses,
COUNT(DISTINCT ls.machine_name) as unique_users
FROM license_sessions ls
JOIN licenses l ON ls.license_id = l.id
WHERE ls.started_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE(ls.started_at)
ORDER BY date
""")
session_history = []
for row in cur.fetchall():
session_history.append({
'date': row[0].strftime('%Y-%m-%d'),
'login_count': row[1],
'unique_licenses': row[2],
'unique_users': row[3]
})
# Durchschnittliche Session-Dauer
cur.execute("""
SELECT
AVG(EXTRACT(EPOCH FROM (ended_at - started_at))/3600) as avg_duration_hours
FROM license_sessions
WHERE ended_at IS NOT NULL
AND ended_at - started_at < INTERVAL '24 hours'
AND started_at >= CURRENT_DATE - INTERVAL '30 days'
""")
avg_duration = cur.fetchone()[0] or 0
return render_template("session_statistics.html",
current_stats={
'active_licenses': current_stats[0],
'unique_users': current_stats[1],
'unique_devices': current_stats[2],
'total_sessions': current_stats[3]
},
sessions_by_type=sessions_by_type,
top_licenses=top_licenses,
session_history=session_history,
avg_duration=round(avg_duration, 1))
except Exception as e:
logging.error(f"Fehler beim Laden der Session-Statistiken: {str(e)}")
flash('Fehler beim Laden der Statistiken!', 'error')
return redirect(url_for('sessions.sessions'))
finally:
cur.close()
conn.close()