import os from datetime import datetime, timedelta from zoneinfo import ZoneInfo from pathlib import Path from flask import Blueprint, render_template, request, redirect, session, url_for, flash, send_file, jsonify import config from auth.decorators import login_required from utils.audit import log_audit from utils.backup import create_backup, restore_backup from utils.network import get_client_ip from db import get_connection, get_db_connection, get_db_cursor, execute_query from utils.export import create_excel_export, prepare_audit_export_data # Create Blueprint admin_bp = Blueprint('admin', __name__) @admin_bp.route("/") @login_required def dashboard(): conn = get_connection() cur = conn.cursor() try: # Hole Statistiken # Anzahl aktiver Lizenzen cur.execute("SELECT COUNT(*) FROM licenses WHERE active = true") active_licenses = cur.fetchone()[0] # Anzahl Kunden cur.execute("SELECT COUNT(*) FROM customers") total_customers = cur.fetchone()[0] # Anzahl aktiver Sessions cur.execute("SELECT COUNT(*) FROM sessions WHERE active = true") active_sessions = cur.fetchone()[0] # Top 10 Lizenzen nach Nutzung (letzte 30 Tage) cur.execute(""" SELECT l.license_key, c.name as customer_name, COUNT(DISTINCT s.id) as session_count, COUNT(DISTINCT s.username) as unique_users, MAX(s.last_activity) as last_activity FROM licenses l LEFT JOIN customers c ON l.customer_id = c.id LEFT JOIN sessions s ON l.license_key = s.license_key AND s.login_time >= CURRENT_TIMESTAMP - INTERVAL '30 days' GROUP BY l.license_key, c.name ORDER BY session_count DESC LIMIT 10 """) top_licenses = cur.fetchall() # Letzte 10 Aktivitäten aus dem Audit Log cur.execute(""" SELECT id, timestamp AT TIME ZONE 'Europe/Berlin' as timestamp, username, action, entity_type, entity_id, additional_info FROM audit_log ORDER BY timestamp DESC LIMIT 10 """) recent_activities = cur.fetchall() # Lizenztyp-Verteilung cur.execute(""" SELECT CASE WHEN is_test_license THEN 'Test' ELSE 'Full' END as license_type, COUNT(*) as count FROM licenses GROUP BY is_test_license """) license_distribution = cur.fetchall() # Sessions nach Stunden (letzte 24h) cur.execute(""" WITH hours AS ( SELECT generate_series( CURRENT_TIMESTAMP - INTERVAL '23 hours', CURRENT_TIMESTAMP, INTERVAL '1 hour' ) AS hour ) SELECT TO_CHAR(hours.hour AT TIME ZONE 'Europe/Berlin', 'HH24:00') as hour_label, COUNT(DISTINCT s.id) as session_count FROM hours LEFT JOIN sessions s ON s.login_time >= hours.hour AND s.login_time < hours.hour + INTERVAL '1 hour' GROUP BY hours.hour ORDER BY hours.hour """) hourly_sessions = cur.fetchall() # System-Status cur.execute("SELECT pg_database_size(current_database())") db_size = cur.fetchone()[0] # Letzte Backup-Info cur.execute(""" SELECT filename, created_at, filesize, status FROM backup_history WHERE status = 'success' ORDER BY created_at DESC LIMIT 1 """) last_backup = cur.fetchone() # Resource Statistiken cur.execute(""" SELECT COUNT(*) FILTER (WHERE status = 'available') as available, COUNT(*) FILTER (WHERE status = 'in_use') as in_use, COUNT(*) FILTER (WHERE status = 'quarantine') as quarantine, COUNT(*) as total FROM resources """) resource_stats = cur.fetchone() return render_template('dashboard.html', active_licenses=active_licenses, total_customers=total_customers, active_sessions=active_sessions, top_licenses=top_licenses, recent_activities=recent_activities, license_distribution=license_distribution, hourly_sessions=hourly_sessions, db_size=db_size, last_backup=last_backup, resource_stats=resource_stats, username=session.get('username')) finally: cur.close() conn.close() @admin_bp.route("/audit") @login_required def audit_log(): page = request.args.get('page', 1, type=int) per_page = 50 search = request.args.get('search', '') action_filter = request.args.get('action', '') entity_filter = request.args.get('entity', '') conn = get_connection() cur = conn.cursor() try: # Base query query = """ SELECT id, timestamp AT TIME ZONE 'Europe/Berlin' as timestamp, username, action, entity_type, entity_id, old_values::text, new_values::text, ip_address, user_agent, additional_info FROM audit_log WHERE 1=1 """ params = [] # Suchfilter if search: query += """ AND ( username ILIKE %s OR action ILIKE %s OR entity_type ILIKE %s OR additional_info ILIKE %s OR ip_address ILIKE %s )""" search_param = f"%{search}%" params.extend([search_param] * 5) # Action Filter if action_filter: query += " AND action = %s" params.append(action_filter) # Entity Filter if entity_filter: query += " AND entity_type = %s" params.append(entity_filter) # Count total count_query = f"SELECT COUNT(*) FROM ({query}) as filtered" cur.execute(count_query, params) total_count = cur.fetchone()[0] # Add pagination query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" params.extend([per_page, (page - 1) * per_page]) cur.execute(query, params) logs = cur.fetchall() # Get unique actions and entities for filters cur.execute("SELECT DISTINCT action FROM audit_log ORDER BY action") actions = [row[0] for row in cur.fetchall()] cur.execute("SELECT DISTINCT entity_type FROM audit_log ORDER BY entity_type") entities = [row[0] for row in cur.fetchall()] # Pagination info total_pages = (total_count + per_page - 1) // per_page # Convert to dictionaries for easier template access audit_logs = [] for log in logs: audit_logs.append({ 'id': log[0], 'timestamp': log[1], 'username': log[2], 'action': log[3], 'entity_type': log[4], 'entity_id': log[5], 'old_values': log[6], 'new_values': log[7], 'ip_address': log[8], 'user_agent': log[9], 'additional_info': log[10] }) return render_template('audit_log.html', logs=audit_logs, page=page, total_pages=total_pages, total_count=total_count, search=search, action_filter=action_filter, entity_filter=entity_filter, actions=actions, entities=entities, username=session.get('username')) finally: cur.close() conn.close() @admin_bp.route("/backups") @login_required def backups(): conn = get_connection() cur = conn.cursor() try: # Hole alle Backups cur.execute(""" SELECT id, filename, created_at AT TIME ZONE 'Europe/Berlin' as created_at, filesize, backup_type, status, created_by, duration_seconds, tables_count, records_count, error_message, is_encrypted FROM backup_history ORDER BY created_at DESC """) backups = cur.fetchall() # Prüfe ob Dateien noch existieren backups_with_status = [] for backup in backups: backup_dict = { 'id': backup[0], 'filename': backup[1], 'created_at': backup[2], 'filesize': backup[3], 'backup_type': backup[4], 'status': backup[5], 'created_by': backup[6], 'duration_seconds': backup[7], 'tables_count': backup[8], 'records_count': backup[9], 'error_message': backup[10], 'is_encrypted': backup[11], 'file_exists': False } # Prüfe ob Datei existiert if backup[1]: # filename filepath = config.BACKUP_DIR / backup[1] backup_dict['file_exists'] = filepath.exists() backups_with_status.append(backup_dict) return render_template('backups.html', backups=backups_with_status, username=session.get('username')) finally: cur.close() conn.close() @admin_bp.route("/backup/create", methods=["POST"]) @login_required def create_backup_route(): """Manuelles Backup erstellen""" success, result = create_backup(backup_type="manual", created_by=session.get('username')) if success: flash(f'Backup erfolgreich erstellt: {result}', 'success') else: flash(f'Backup fehlgeschlagen: {result}', 'error') return redirect(url_for('admin.backups')) @admin_bp.route("/backup/restore/", methods=["POST"]) @login_required def restore_backup_route(backup_id): """Backup wiederherstellen""" encryption_key = request.form.get('encryption_key') success, message = restore_backup(backup_id, encryption_key) if success: flash(message, 'success') else: flash(f'Wiederherstellung fehlgeschlagen: {message}', 'error') return redirect(url_for('admin.backups')) @admin_bp.route("/backup/download/") @login_required def download_backup(backup_id): """Backup herunterladen""" conn = get_connection() cur = conn.cursor() try: # Hole Backup-Info cur.execute("SELECT filename, filepath FROM backup_history WHERE id = %s", (backup_id,)) result = cur.fetchone() if not result: flash('Backup nicht gefunden', 'error') return redirect(url_for('admin.backups')) filename, filepath = result filepath = Path(filepath) if not filepath.exists(): flash('Backup-Datei nicht gefunden', 'error') return redirect(url_for('admin.backups')) # Audit-Log log_audit('BACKUP_DOWNLOAD', 'backup', backup_id, additional_info=f"Backup heruntergeladen: {filename}") return send_file(filepath, as_attachment=True, download_name=filename) finally: cur.close() conn.close() @admin_bp.route("/backup/delete/", methods=["DELETE"]) @login_required def delete_backup(backup_id): """Backup löschen""" conn = get_connection() cur = conn.cursor() try: # Hole Backup-Info cur.execute("SELECT filename, filepath FROM backup_history WHERE id = %s", (backup_id,)) result = cur.fetchone() if not result: return jsonify({'success': False, 'message': 'Backup nicht gefunden'}), 404 filename, filepath = result filepath = Path(filepath) # Lösche Datei wenn vorhanden if filepath.exists(): try: filepath.unlink() except Exception as e: return jsonify({'success': False, 'message': f'Fehler beim Löschen der Datei: {str(e)}'}), 500 # Lösche Datenbank-Eintrag cur.execute("DELETE FROM backup_history WHERE id = %s", (backup_id,)) conn.commit() # Audit-Log log_audit('BACKUP_DELETE', 'backup', backup_id, additional_info=f"Backup gelöscht: {filename}") return jsonify({'success': True, 'message': 'Backup erfolgreich gelöscht'}) except Exception as e: conn.rollback() return jsonify({'success': False, 'message': str(e)}), 500 finally: cur.close() conn.close() @admin_bp.route("/security/blocked-ips") @login_required def blocked_ips(): """Zeigt gesperrte IP-Adressen""" conn = get_connection() cur = conn.cursor() try: cur.execute(""" SELECT ip_address, attempt_count, last_attempt AT TIME ZONE 'Europe/Berlin' as last_attempt, blocked_until AT TIME ZONE 'Europe/Berlin' as blocked_until, last_username_tried, last_error_message FROM login_attempts WHERE blocked_until IS NOT NULL AND blocked_until > CURRENT_TIMESTAMP ORDER BY blocked_until DESC """) blocked = cur.fetchall() # Alle Login-Versuche (auch nicht gesperrte) cur.execute(""" SELECT ip_address, attempt_count, last_attempt AT TIME ZONE 'Europe/Berlin' as last_attempt, blocked_until AT TIME ZONE 'Europe/Berlin' as blocked_until, last_username_tried, last_error_message FROM login_attempts ORDER BY last_attempt DESC LIMIT 100 """) all_attempts = cur.fetchall() return render_template('blocked_ips.html', blocked_ips=blocked, all_attempts=all_attempts, username=session.get('username')) finally: cur.close() conn.close() @admin_bp.route("/security/unblock-ip", methods=["POST"]) @login_required def unblock_ip(): """Entsperrt eine IP-Adresse""" ip_address = request.form.get('ip_address') if not ip_address: flash('Keine IP-Adresse angegeben', 'error') return redirect(url_for('admin.blocked_ips')) conn = get_connection() cur = conn.cursor() try: cur.execute(""" UPDATE login_attempts SET blocked_until = NULL WHERE ip_address = %s """, (ip_address,)) if cur.rowcount > 0: conn.commit() flash(f'IP-Adresse {ip_address} wurde entsperrt', 'success') log_audit('UNBLOCK_IP', 'security', additional_info=f"IP-Adresse entsperrt: {ip_address}") else: flash(f'IP-Adresse {ip_address} nicht gefunden', 'warning') except Exception as e: conn.rollback() flash(f'Fehler beim Entsperren: {str(e)}', 'error') finally: cur.close() conn.close() return redirect(url_for('admin.blocked_ips')) @admin_bp.route("/security/clear-attempts", methods=["POST"]) @login_required def clear_attempts(): """Löscht alle Login-Versuche""" conn = get_connection() cur = conn.cursor() try: cur.execute("DELETE FROM login_attempts") count = cur.rowcount conn.commit() flash(f'{count} Login-Versuche wurden gelöscht', 'success') log_audit('CLEAR_LOGIN_ATTEMPTS', 'security', additional_info=f"{count} Login-Versuche gelöscht") except Exception as e: conn.rollback() flash(f'Fehler beim Löschen: {str(e)}', 'error') finally: cur.close() conn.close() return redirect(url_for('admin.blocked_ips'))