413 Zeilen
12 KiB
Python
413 Zeilen
12 KiB
Python
import os
|
|
import psycopg2
|
|
from flask import Flask, render_template, request, redirect, session, url_for
|
|
from flask_session import Session
|
|
from functools import wraps
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = os.urandom(24)
|
|
app.config['SESSION_TYPE'] = 'filesystem'
|
|
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
|
|
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
|
|
Session(app)
|
|
|
|
# Login decorator
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'logged_in' not in session:
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# DB-Verbindung mit UTF-8 Encoding
|
|
def get_connection():
|
|
conn = psycopg2.connect(
|
|
host=os.getenv("POSTGRES_HOST", "postgres"),
|
|
port=os.getenv("POSTGRES_PORT", "5432"),
|
|
dbname=os.getenv("POSTGRES_DB"),
|
|
user=os.getenv("POSTGRES_USER"),
|
|
password=os.getenv("POSTGRES_PASSWORD"),
|
|
options='-c client_encoding=UTF8'
|
|
)
|
|
conn.set_client_encoding('UTF8')
|
|
return conn
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
username = request.form.get("username")
|
|
password = request.form.get("password")
|
|
|
|
# Check gegen beide Admin-Accounts aus .env
|
|
admin1_user = os.getenv("ADMIN1_USERNAME")
|
|
admin1_pass = os.getenv("ADMIN1_PASSWORD")
|
|
admin2_user = os.getenv("ADMIN2_USERNAME")
|
|
admin2_pass = os.getenv("ADMIN2_PASSWORD")
|
|
|
|
if ((username == admin1_user and password == admin1_pass) or
|
|
(username == admin2_user and password == admin2_pass)):
|
|
session['logged_in'] = True
|
|
session['username'] = username
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
return render_template("login.html", error="Ungültige Anmeldedaten")
|
|
|
|
return render_template("login.html")
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.pop('logged_in', None)
|
|
session.pop('username', None)
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def dashboard():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Statistiken abrufen
|
|
# Gesamtanzahl Kunden
|
|
cur.execute("SELECT COUNT(*) FROM customers")
|
|
total_customers = cur.fetchone()[0]
|
|
|
|
# Gesamtanzahl Lizenzen
|
|
cur.execute("SELECT COUNT(*) FROM licenses")
|
|
total_licenses = cur.fetchone()[0]
|
|
|
|
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
|
|
""")
|
|
active_licenses = cur.fetchone()[0]
|
|
|
|
# Abgelaufene Lizenzen
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until < CURRENT_DATE
|
|
""")
|
|
expired_licenses = cur.fetchone()[0]
|
|
|
|
# Lizenzen die in den nächsten 30 Tagen ablaufen
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until >= CURRENT_DATE
|
|
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
|
|
AND is_active = TRUE
|
|
""")
|
|
expiring_soon = cur.fetchone()[0]
|
|
|
|
# Testlizenzen vs Vollversionen
|
|
cur.execute("""
|
|
SELECT license_type, COUNT(*)
|
|
FROM licenses
|
|
GROUP BY license_type
|
|
""")
|
|
license_types = dict(cur.fetchall())
|
|
|
|
# Letzte 5 erstellten Lizenzen
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, l.valid_until,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
|
|
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
|
|
ELSE 'aktiv'
|
|
END as status
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
ORDER BY l.id DESC
|
|
LIMIT 5
|
|
""")
|
|
recent_licenses = cur.fetchall()
|
|
|
|
# Bald ablaufende Lizenzen (nächste 30 Tage)
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, l.valid_until,
|
|
l.valid_until - CURRENT_DATE as days_left
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.valid_until >= CURRENT_DATE
|
|
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
|
|
AND l.is_active = TRUE
|
|
ORDER BY l.valid_until
|
|
LIMIT 10
|
|
""")
|
|
expiring_licenses = cur.fetchall()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
stats = {
|
|
'total_customers': total_customers,
|
|
'total_licenses': total_licenses,
|
|
'active_licenses': active_licenses,
|
|
'expired_licenses': expired_licenses,
|
|
'expiring_soon': expiring_soon,
|
|
'full_licenses': license_types.get('full', 0),
|
|
'test_licenses': license_types.get('test', 0),
|
|
'recent_licenses': recent_licenses,
|
|
'expiring_licenses': expiring_licenses
|
|
}
|
|
|
|
return render_template("dashboard.html", stats=stats, username=session.get('username'))
|
|
|
|
@app.route("/create", methods=["GET", "POST"])
|
|
@login_required
|
|
def create_license():
|
|
if request.method == "POST":
|
|
name = request.form["customer_name"]
|
|
email = request.form["email"]
|
|
license_key = request.form["license_key"]
|
|
license_type = request.form["license_type"]
|
|
valid_from = request.form["valid_from"]
|
|
valid_until = request.form["valid_until"]
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Kunde einfügen (falls nicht vorhanden)
|
|
cur.execute("""
|
|
INSERT INTO customers (name, email, created_at)
|
|
VALUES (%s, %s, NOW())
|
|
RETURNING id
|
|
""", (name, email))
|
|
customer_id = cur.fetchone()[0]
|
|
|
|
# Lizenz hinzufügen
|
|
cur.execute("""
|
|
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, TRUE)
|
|
""", (license_key, customer_id, license_type, valid_from, valid_until))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/create")
|
|
|
|
return render_template("index.html", username=session.get('username'))
|
|
|
|
@app.route("/licenses")
|
|
@login_required
|
|
def licenses():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Suchparameter
|
|
search = request.args.get('search', '').strip()
|
|
|
|
# SQL Query mit optionaler Suche
|
|
query = """
|
|
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
|
|
l.valid_from, l.valid_until, l.is_active,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
|
|
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
|
|
ELSE 'aktiv'
|
|
END as status
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
"""
|
|
|
|
if search:
|
|
query += """
|
|
WHERE LOWER(l.license_key) LIKE LOWER(%s)
|
|
OR LOWER(c.name) LIKE LOWER(%s)
|
|
OR LOWER(c.email) LIKE LOWER(%s)
|
|
"""
|
|
search_param = f'%{search}%'
|
|
cur.execute(query + " ORDER BY l.valid_until DESC",
|
|
(search_param, search_param, search_param))
|
|
else:
|
|
cur.execute(query + " ORDER BY l.valid_until DESC")
|
|
|
|
licenses = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("licenses.html", licenses=licenses, search=search, username=session.get('username'))
|
|
|
|
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edit_license(license_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
# Update license
|
|
license_key = request.form["license_key"]
|
|
license_type = request.form["license_type"]
|
|
valid_from = request.form["valid_from"]
|
|
valid_until = request.form["valid_until"]
|
|
is_active = request.form.get("is_active") == "on"
|
|
|
|
cur.execute("""
|
|
UPDATE licenses
|
|
SET license_key = %s, license_type = %s, valid_from = %s,
|
|
valid_until = %s, is_active = %s
|
|
WHERE id = %s
|
|
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/licenses")
|
|
|
|
# Get license data
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
|
|
l.valid_from, l.valid_until, l.is_active, c.id
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.id = %s
|
|
""", (license_id,))
|
|
|
|
license = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not license:
|
|
return redirect("/licenses")
|
|
|
|
return render_template("edit_license.html", license=license, username=session.get('username'))
|
|
|
|
@app.route("/license/delete/<int:license_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_license(license_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/licenses")
|
|
|
|
@app.route("/customers")
|
|
@login_required
|
|
def customers():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Suchparameter
|
|
search = request.args.get('search', '').strip()
|
|
|
|
# SQL Query mit optionaler Suche
|
|
query = """
|
|
SELECT c.id, c.name, c.email, c.created_at,
|
|
COUNT(l.id) as license_count,
|
|
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
"""
|
|
|
|
if search:
|
|
query += """
|
|
WHERE LOWER(c.name) LIKE LOWER(%s)
|
|
OR LOWER(c.email) LIKE LOWER(%s)
|
|
"""
|
|
search_param = f'%{search}%'
|
|
query += """
|
|
GROUP BY c.id, c.name, c.email, c.created_at
|
|
ORDER BY c.created_at DESC
|
|
"""
|
|
cur.execute(query, (search_param, search_param))
|
|
else:
|
|
query += """
|
|
GROUP BY c.id, c.name, c.email, c.created_at
|
|
ORDER BY c.created_at DESC
|
|
"""
|
|
cur.execute(query)
|
|
|
|
customers = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("customers.html", customers=customers, search=search, username=session.get('username'))
|
|
|
|
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edit_customer(customer_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
# Update customer
|
|
name = request.form["name"]
|
|
email = request.form["email"]
|
|
|
|
cur.execute("""
|
|
UPDATE customers
|
|
SET name = %s, email = %s
|
|
WHERE id = %s
|
|
""", (name, email, customer_id))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/customers")
|
|
|
|
# Get customer data with licenses
|
|
cur.execute("""
|
|
SELECT id, name, email, created_at
|
|
FROM customers
|
|
WHERE id = %s
|
|
""", (customer_id,))
|
|
|
|
customer = cur.fetchone()
|
|
|
|
# Get customer's licenses
|
|
cur.execute("""
|
|
SELECT id, license_key, license_type, valid_from, valid_until, is_active
|
|
FROM licenses
|
|
WHERE customer_id = %s
|
|
ORDER BY valid_until DESC
|
|
""", (customer_id,))
|
|
|
|
licenses = cur.fetchall()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not customer:
|
|
return redirect("/customers")
|
|
|
|
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
|
|
|
|
@app.route("/customer/delete/<int:customer_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_customer(customer_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Prüfen ob Kunde Lizenzen hat
|
|
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
|
|
license_count = cur.fetchone()[0]
|
|
|
|
if license_count > 0:
|
|
# Kunde hat Lizenzen - nicht löschen
|
|
cur.close()
|
|
conn.close()
|
|
return redirect("/customers")
|
|
|
|
# Kunde löschen wenn keine Lizenzen vorhanden
|
|
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/customers")
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=443, ssl_context='adhoc')
|