import os import psycopg2 from flask import Flask, render_template, request, redirect, session, url_for from flask_session import Session from functools import wraps from dotenv import load_dotenv load_dotenv() app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(24) app.config['SESSION_TYPE'] = 'filesystem' app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8 app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8' Session(app) # Login decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'logged_in' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # DB-Verbindung mit UTF-8 Encoding def get_connection(): conn = psycopg2.connect( host=os.getenv("POSTGRES_HOST", "postgres"), port=os.getenv("POSTGRES_PORT", "5432"), dbname=os.getenv("POSTGRES_DB"), user=os.getenv("POSTGRES_USER"), password=os.getenv("POSTGRES_PASSWORD"), options='-c client_encoding=UTF8' ) conn.set_client_encoding('UTF8') return conn @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username") password = request.form.get("password") # Check gegen beide Admin-Accounts aus .env admin1_user = os.getenv("ADMIN1_USERNAME") admin1_pass = os.getenv("ADMIN1_PASSWORD") admin2_user = os.getenv("ADMIN2_USERNAME") admin2_pass = os.getenv("ADMIN2_PASSWORD") if ((username == admin1_user and password == admin1_pass) or (username == admin2_user and password == admin2_pass)): session['logged_in'] = True session['username'] = username return redirect(url_for('dashboard')) else: return render_template("login.html", error="Ungültige Anmeldedaten") return render_template("login.html") @app.route("/logout") def logout(): session.pop('logged_in', None) session.pop('username', None) return redirect(url_for('login')) @app.route("/") @login_required def dashboard(): conn = get_connection() cur = conn.cursor() # Statistiken abrufen # Gesamtanzahl Kunden cur.execute("SELECT COUNT(*) FROM customers") total_customers = cur.fetchone()[0] # Gesamtanzahl Lizenzen cur.execute("SELECT COUNT(*) FROM licenses") total_licenses = cur.fetchone()[0] # Aktive Lizenzen (nicht abgelaufen und is_active = true) cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until >= CURRENT_DATE AND is_active = TRUE """) active_licenses = cur.fetchone()[0] # Abgelaufene Lizenzen cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until < CURRENT_DATE """) expired_licenses = cur.fetchone()[0] # Lizenzen die in den nächsten 30 Tagen ablaufen cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until >= CURRENT_DATE AND valid_until < CURRENT_DATE + INTERVAL '30 days' AND is_active = TRUE """) expiring_soon = cur.fetchone()[0] # Testlizenzen vs Vollversionen cur.execute(""" SELECT license_type, COUNT(*) FROM licenses GROUP BY license_type """) license_types = dict(cur.fetchall()) # Letzte 5 erstellten Lizenzen cur.execute(""" SELECT l.id, l.license_key, c.name, l.valid_until, CASE WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen' WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab' ELSE 'aktiv' END as status FROM licenses l JOIN customers c ON l.customer_id = c.id ORDER BY l.id DESC LIMIT 5 """) recent_licenses = cur.fetchall() # Bald ablaufende Lizenzen (nächste 30 Tage) cur.execute(""" SELECT l.id, l.license_key, c.name, l.valid_until, l.valid_until - CURRENT_DATE as days_left FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE ORDER BY l.valid_until LIMIT 10 """) expiring_licenses = cur.fetchall() cur.close() conn.close() stats = { 'total_customers': total_customers, 'total_licenses': total_licenses, 'active_licenses': active_licenses, 'expired_licenses': expired_licenses, 'expiring_soon': expiring_soon, 'full_licenses': license_types.get('full', 0), 'test_licenses': license_types.get('test', 0), 'recent_licenses': recent_licenses, 'expiring_licenses': expiring_licenses } return render_template("dashboard.html", stats=stats, username=session.get('username')) @app.route("/create", methods=["GET", "POST"]) @login_required def create_license(): if request.method == "POST": name = request.form["customer_name"] email = request.form["email"] license_key = request.form["license_key"] license_type = request.form["license_type"] valid_from = request.form["valid_from"] valid_until = request.form["valid_until"] conn = get_connection() cur = conn.cursor() # Kunde einfügen (falls nicht vorhanden) cur.execute(""" INSERT INTO customers (name, email, created_at) VALUES (%s, %s, NOW()) RETURNING id """, (name, email)) customer_id = cur.fetchone()[0] # Lizenz hinzufügen cur.execute(""" INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active) VALUES (%s, %s, %s, %s, %s, TRUE) """, (license_key, customer_id, license_type, valid_from, valid_until)) conn.commit() cur.close() conn.close() return redirect("/create") return render_template("index.html", username=session.get('username')) @app.route("/licenses") @login_required def licenses(): conn = get_connection() cur = conn.cursor() # Alle Lizenzen mit Kundeninformationen abrufen cur.execute(""" SELECT l.id, l.license_key, c.name, c.email, l.license_type, l.valid_from, l.valid_until, l.is_active, CASE WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen' WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab' ELSE 'aktiv' END as status FROM licenses l JOIN customers c ON l.customer_id = c.id ORDER BY l.valid_until DESC """) licenses = cur.fetchall() cur.close() conn.close() return render_template("licenses.html", licenses=licenses, username=session.get('username')) @app.route("/license/edit/", methods=["GET", "POST"]) @login_required def edit_license(license_id): conn = get_connection() cur = conn.cursor() if request.method == "POST": # Update license license_key = request.form["license_key"] license_type = request.form["license_type"] valid_from = request.form["valid_from"] valid_until = request.form["valid_until"] is_active = request.form.get("is_active") == "on" cur.execute(""" UPDATE licenses SET license_key = %s, license_type = %s, valid_from = %s, valid_until = %s, is_active = %s WHERE id = %s """, (license_key, license_type, valid_from, valid_until, is_active, license_id)) conn.commit() cur.close() conn.close() return redirect("/licenses") # Get license data cur.execute(""" SELECT l.id, l.license_key, c.name, c.email, l.license_type, l.valid_from, l.valid_until, l.is_active, c.id FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE l.id = %s """, (license_id,)) license = cur.fetchone() cur.close() conn.close() if not license: return redirect("/licenses") return render_template("edit_license.html", license=license, username=session.get('username')) @app.route("/license/delete/", methods=["POST"]) @login_required def delete_license(license_id): conn = get_connection() cur = conn.cursor() cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,)) conn.commit() cur.close() conn.close() return redirect("/licenses") @app.route("/customers") @login_required def customers(): conn = get_connection() cur = conn.cursor() # Alle Kunden mit Anzahl der Lizenzen abrufen cur.execute(""" SELECT c.id, c.name, c.email, c.created_at, COUNT(l.id) as license_count, COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses FROM customers c LEFT JOIN licenses l ON c.id = l.customer_id GROUP BY c.id, c.name, c.email, c.created_at ORDER BY c.created_at DESC """) customers = cur.fetchall() cur.close() conn.close() return render_template("customers.html", customers=customers, username=session.get('username')) @app.route("/customer/edit/", methods=["GET", "POST"]) @login_required def edit_customer(customer_id): conn = get_connection() cur = conn.cursor() if request.method == "POST": # Update customer name = request.form["name"] email = request.form["email"] cur.execute(""" UPDATE customers SET name = %s, email = %s WHERE id = %s """, (name, email, customer_id)) conn.commit() cur.close() conn.close() return redirect("/customers") # Get customer data with licenses cur.execute(""" SELECT id, name, email, created_at FROM customers WHERE id = %s """, (customer_id,)) customer = cur.fetchone() # Get customer's licenses cur.execute(""" SELECT id, license_key, license_type, valid_from, valid_until, is_active FROM licenses WHERE customer_id = %s ORDER BY valid_until DESC """, (customer_id,)) licenses = cur.fetchall() cur.close() conn.close() if not customer: return redirect("/customers") return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username')) @app.route("/customer/delete/", methods=["POST"]) @login_required def delete_customer(customer_id): conn = get_connection() cur = conn.cursor() # Prüfen ob Kunde Lizenzen hat cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,)) license_count = cur.fetchone()[0] if license_count > 0: # Kunde hat Lizenzen - nicht löschen cur.close() conn.close() return redirect("/customers") # Kunde löschen wenn keine Lizenzen vorhanden cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,)) conn.commit() cur.close() conn.close() return redirect("/customers") if __name__ == "__main__": app.run(host="0.0.0.0", port=443, ssl_context='adhoc')