Dateien
Hetzner-Backup/v2_adminpanel/app.py
2025-06-07 16:01:35 +02:00

713 Zeilen
22 KiB
Python

import os
import psycopg2
from flask import Flask, render_template, request, redirect, session, url_for, send_file
from flask_session import Session
from functools import wraps
from dotenv import load_dotenv
import pandas as pd
from datetime import datetime
import io
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
Session(app)
# Login decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# DB-Verbindung mit UTF-8 Encoding
def get_connection():
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "5432"),
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
options='-c client_encoding=UTF8'
)
conn.set_client_encoding('UTF8')
return conn
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
# Check gegen beide Admin-Accounts aus .env
admin1_user = os.getenv("ADMIN1_USERNAME")
admin1_pass = os.getenv("ADMIN1_PASSWORD")
admin2_user = os.getenv("ADMIN2_USERNAME")
admin2_pass = os.getenv("ADMIN2_PASSWORD")
if ((username == admin1_user and password == admin1_pass) or
(username == admin2_user and password == admin2_pass)):
session['logged_in'] = True
session['username'] = username
return redirect(url_for('dashboard'))
else:
return render_template("login.html", error="Ungültige Anmeldedaten")
return render_template("login.html")
@app.route("/logout")
def logout():
session.pop('logged_in', None)
session.pop('username', None)
return redirect(url_for('login'))
@app.route("/")
@login_required
def dashboard():
conn = get_connection()
cur = conn.cursor()
# Statistiken abrufen
# Gesamtanzahl Kunden
cur.execute("SELECT COUNT(*) FROM customers")
total_customers = cur.fetchone()[0]
# Gesamtanzahl Lizenzen
cur.execute("SELECT COUNT(*) FROM licenses")
total_licenses = cur.fetchone()[0]
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
""")
active_licenses = cur.fetchone()[0]
# Aktive Sessions
cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE")
active_sessions_count = cur.fetchone()[0]
# Abgelaufene Lizenzen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until < CURRENT_DATE
""")
expired_licenses = cur.fetchone()[0]
# Lizenzen die in den nächsten 30 Tagen ablaufen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
AND is_active = TRUE
""")
expiring_soon = cur.fetchone()[0]
# Testlizenzen vs Vollversionen
cur.execute("""
SELECT license_type, COUNT(*)
FROM licenses
GROUP BY license_type
""")
license_types = dict(cur.fetchall())
# Letzte 5 erstellten Lizenzen
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id DESC
LIMIT 5
""")
recent_licenses = cur.fetchall()
# Bald ablaufende Lizenzen (nächste 30 Tage)
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
l.valid_until - CURRENT_DATE as days_left
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.valid_until >= CURRENT_DATE
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
AND l.is_active = TRUE
ORDER BY l.valid_until
LIMIT 10
""")
expiring_licenses = cur.fetchall()
cur.close()
conn.close()
stats = {
'total_customers': total_customers,
'total_licenses': total_licenses,
'active_licenses': active_licenses,
'expired_licenses': expired_licenses,
'expiring_soon': expiring_soon,
'full_licenses': license_types.get('full', 0),
'test_licenses': license_types.get('test', 0),
'recent_licenses': recent_licenses,
'expiring_licenses': expiring_licenses,
'active_sessions': active_sessions_count
}
return render_template("dashboard.html", stats=stats, username=session.get('username'))
@app.route("/create", methods=["GET", "POST"])
@login_required
def create_license():
if request.method == "POST":
name = request.form["customer_name"]
email = request.form["email"]
license_key = request.form["license_key"]
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
conn = get_connection()
cur = conn.cursor()
# Kunde einfügen (falls nicht vorhanden)
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
RETURNING id
""", (name, email))
customer_id = cur.fetchone()[0]
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active)
VALUES (%s, %s, %s, %s, %s, TRUE)
""", (license_key, customer_id, license_type, valid_from, valid_until))
conn.commit()
cur.close()
conn.close()
return redirect("/create")
return render_template("index.html", username=session.get('username'))
@app.route("/licenses")
@login_required
def licenses():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
page = request.args.get('page', 1, type=int)
per_page = 20
# SQL Query mit optionaler Suche und Filtern
query = """
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
# Suchfilter
if search:
query += """
AND (LOWER(l.license_key) LIKE LOWER(%s)
OR LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s))
"""
search_param = f'%{search}%'
params.extend([search_param, search_param, search_param])
# Typ-Filter
if filter_type:
query += " AND l.license_type = %s"
params.append(filter_type)
# Status-Filter
if filter_status == 'active':
query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE"
elif filter_status == 'expiring':
query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE"
elif filter_status == 'expired':
query += " AND l.valid_until < CURRENT_DATE"
elif filter_status == 'inactive':
query += " AND l.is_active = FALSE"
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query += " ORDER BY l.valid_until DESC LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
licenses = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("licenses.html",
licenses=licenses,
search=search,
filter_type=filter_type,
filter_status=filter_status,
page=page,
total_pages=total_pages,
total=total,
username=session.get('username'))
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
@login_required
def edit_license(license_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Update license
license_key = request.form["license_key"]
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
is_active = request.form.get("is_active") == "on"
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s
WHERE id = %s
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
conn.commit()
cur.close()
conn.close()
return redirect("/licenses")
# Get license data
cur.execute("""
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active, c.id
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
license = cur.fetchone()
cur.close()
conn.close()
if not license:
return redirect("/licenses")
return render_template("edit_license.html", license=license, username=session.get('username'))
@app.route("/license/delete/<int:license_id>", methods=["POST"])
@login_required
def delete_license(license_id):
conn = get_connection()
cur = conn.cursor()
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
conn.commit()
cur.close()
conn.close()
return redirect("/licenses")
@app.route("/customers")
@login_required
def customers():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
# SQL Query mit optionaler Suche
base_query = """
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
"""
params = []
if search:
base_query += """
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
"""
search_param = f'%{search}%'
params.extend([search_param, search_param])
# Gesamtanzahl für Pagination
count_query = f"""
SELECT COUNT(DISTINCT c.id)
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
{"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""}
"""
if search:
cur.execute(count_query, params)
else:
cur.execute(count_query)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query = base_query + """
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
cur.execute(query, params)
customers = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("customers.html",
customers=customers,
search=search,
page=page,
total_pages=total_pages,
total=total,
username=session.get('username'))
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
@login_required
def edit_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Update customer
name = request.form["name"]
email = request.form["email"]
cur.execute("""
UPDATE customers
SET name = %s, email = %s
WHERE id = %s
""", (name, email, customer_id))
conn.commit()
cur.close()
conn.close()
return redirect("/customers")
# Get customer data with licenses
cur.execute("""
SELECT id, name, email, created_at
FROM customers
WHERE id = %s
""", (customer_id,))
customer = cur.fetchone()
# Get customer's licenses
cur.execute("""
SELECT id, license_key, license_type, valid_from, valid_until, is_active
FROM licenses
WHERE customer_id = %s
ORDER BY valid_until DESC
""", (customer_id,))
licenses = cur.fetchall()
cur.close()
conn.close()
if not customer:
return redirect("/customers")
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
@app.route("/customer/delete/<int:customer_id>", methods=["POST"])
@login_required
def delete_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
# Prüfen ob Kunde Lizenzen hat
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
license_count = cur.fetchone()[0]
if license_count > 0:
# Kunde hat Lizenzen - nicht löschen
cur.close()
conn.close()
return redirect("/customers")
# Kunde löschen wenn keine Lizenzen vorhanden
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
conn.commit()
cur.close()
conn.close()
return redirect("/customers")
@app.route("/sessions")
@login_required
def sessions():
conn = get_connection()
cur = conn.cursor()
# Aktive Sessions abrufen
cur.execute("""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.user_agent, s.started_at, s.last_heartbeat,
EXTRACT(EPOCH FROM (NOW() - s.last_heartbeat))/60 as minutes_inactive
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = TRUE
ORDER BY s.last_heartbeat DESC
""")
active_sessions = cur.fetchall()
# Inaktive Sessions der letzten 24 Stunden
cur.execute("""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.started_at, s.ended_at,
EXTRACT(EPOCH FROM (s.ended_at - s.started_at))/60 as duration_minutes
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = FALSE
AND s.ended_at > NOW() - INTERVAL '24 hours'
ORDER BY s.ended_at DESC
LIMIT 50
""")
recent_sessions = cur.fetchall()
cur.close()
conn.close()
return render_template("sessions.html",
active_sessions=active_sessions,
recent_sessions=recent_sessions,
username=session.get('username'))
@app.route("/session/end/<int:session_id>", methods=["POST"])
@login_required
def end_session(session_id):
conn = get_connection()
cur = conn.cursor()
# Session beenden
cur.execute("""
UPDATE sessions
SET is_active = FALSE, ended_at = NOW()
WHERE id = %s AND is_active = TRUE
""", (session_id,))
conn.commit()
cur.close()
conn.close()
return redirect("/sessions")
@app.route("/export/licenses")
@login_required
def export_licenses():
conn = get_connection()
cur = conn.cursor()
# Alle Lizenzen mit Kundeninformationen abrufen
cur.execute("""
SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email,
l.license_type, l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'Läuft bald ab'
ELSE 'Aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id
""")
# Spaltennamen
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ',
'Gültig von', 'Gültig bis', 'Aktiv', 'Status']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Gültig von'] = pd.to_datetime(df['Gültig von']).dt.strftime('%d.%m.%Y')
df['Gültig bis'] = pd.to_datetime(df['Gültig bis']).dt.strftime('%d.%m.%Y')
# Typ und Aktiv Status anpassen
df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'})
df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'})
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
filename = f'lizenzen_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Lizenzen', index=False)
# Formatierung
worksheet = writer.sheets['Lizenzen']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
@app.route("/export/customers")
@login_required
def export_customers():
conn = get_connection()
cur = conn.cursor()
# Alle Kunden mit Lizenzstatistiken
cur.execute("""
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.id
""")
# Spaltennamen
columns = ['ID', 'Name', 'E-Mail', 'Erstellt am',
'Lizenzen gesamt', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Erstellt am'] = pd.to_datetime(df['Erstellt am']).dt.strftime('%d.%m.%Y %H:%M')
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
filename = f'kunden_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Kunden', index=False)
# Formatierung
worksheet = writer.sheets['Kunden']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=443, ssl_context='adhoc')