Dieser Commit ist enthalten in:
2025-06-16 23:20:23 +02:00
Ursprung 491551309c
Commit dcb5205e81
108 geänderte Dateien mit 95048 neuen und 50 gelöschten Zeilen

Datei anzeigen

@@ -1,5 +1,8 @@
# Temporary models file - will be expanded in Phase 3
from db import execute_query
from db import execute_query, get_db_connection, get_db_cursor
import logging
logger = logging.getLogger(__name__)
def get_user_by_username(username):
@@ -26,4 +29,134 @@ def get_user_by_username(username):
'last_password_change': result[7],
'failed_2fa_attempts': result[8]
}
return None
return None
def get_licenses(show_test=False):
"""Get all licenses from database"""
try:
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
if show_test:
cur.execute("""
SELECT l.*, c.name as customer_name
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
ORDER BY l.created_at DESC
""")
else:
cur.execute("""
SELECT l.*, c.name as customer_name
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
WHERE l.is_test = false
ORDER BY l.created_at DESC
""")
columns = [desc[0] for desc in cur.description]
licenses = []
for row in cur.fetchall():
license_dict = dict(zip(columns, row))
licenses.append(license_dict)
return licenses
except Exception as e:
logger.error(f"Error fetching licenses: {str(e)}")
return []
def get_license_by_id(license_id):
"""Get a specific license by ID"""
try:
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
SELECT l.*, c.name as customer_name
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
row = cur.fetchone()
if row:
columns = [desc[0] for desc in cur.description]
return dict(zip(columns, row))
return None
except Exception as e:
logger.error(f"Error fetching license {license_id}: {str(e)}")
return None
def get_customers():
"""Get all customers from database"""
try:
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
SELECT c.*,
COUNT(DISTINCT l.id) as license_count,
COUNT(DISTINCT CASE WHEN l.is_active THEN l.id END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id
ORDER BY c.name
""")
columns = [desc[0] for desc in cur.description]
customers = []
for row in cur.fetchall():
customer_dict = dict(zip(columns, row))
customers.append(customer_dict)
return customers
except Exception as e:
logger.error(f"Error fetching customers: {str(e)}")
return []
def get_customer_by_id(customer_id):
"""Get a specific customer by ID"""
try:
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
SELECT c.*,
COUNT(DISTINCT l.id) as license_count,
COUNT(DISTINCT CASE WHEN l.is_active THEN l.id END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE c.id = %s
GROUP BY c.id
""", (customer_id,))
row = cur.fetchone()
if row:
columns = [desc[0] for desc in cur.description]
return dict(zip(columns, row))
return None
except Exception as e:
logger.error(f"Error fetching customer {customer_id}: {str(e)}")
return None
def get_active_sessions():
"""Get all active sessions"""
try:
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
SELECT s.*, l.license_key, c.name as customer_name
FROM sessions s
JOIN licenses l ON s.license_id = l.id
LEFT JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = true
ORDER BY s.start_time DESC
""")
columns = [desc[0] for desc in cur.description]
sessions = []
for row in cur.fetchall():
session_dict = dict(zip(columns, row))
sessions.append(session_dict)
return sessions
except Exception as e:
logger.error(f"Error fetching active sessions: {str(e)}")
return []