275 Zeilen
10 KiB
Python
275 Zeilen
10 KiB
Python
# Temporary models file - will be expanded in Phase 3
|
|
from db import execute_query, get_db_connection, get_db_cursor
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_user_by_username(username):
|
|
"""Get user from database by username"""
|
|
result = execute_query(
|
|
"""
|
|
SELECT id, username, password_hash, email, totp_secret, totp_enabled,
|
|
backup_codes, last_password_change, failed_2fa_attempts
|
|
FROM users WHERE username = %s
|
|
""",
|
|
(username,),
|
|
fetch_one=True
|
|
)
|
|
|
|
if result:
|
|
return {
|
|
'id': result[0],
|
|
'username': result[1],
|
|
'password_hash': result[2],
|
|
'email': result[3],
|
|
'totp_secret': result[4],
|
|
'totp_enabled': result[5],
|
|
'backup_codes': result[6],
|
|
'last_password_change': result[7],
|
|
'failed_2fa_attempts': result[8]
|
|
}
|
|
return None
|
|
|
|
|
|
def get_licenses(show_fake=False):
|
|
"""Get all licenses from database"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
if show_fake:
|
|
cur.execute("""
|
|
SELECT l.*, c.name as customer_name,
|
|
(SELECT COUNT(*) FROM license_sessions ls WHERE ls.license_id = l.id) as active_sessions
|
|
FROM licenses l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
ORDER BY l.created_at DESC
|
|
""")
|
|
else:
|
|
cur.execute("""
|
|
SELECT l.*, c.name as customer_name,
|
|
(SELECT COUNT(*) FROM license_sessions ls WHERE ls.license_id = l.id) as active_sessions
|
|
FROM licenses l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.is_fake = false
|
|
ORDER BY l.created_at DESC
|
|
""")
|
|
|
|
columns = [desc[0] for desc in cur.description]
|
|
licenses = []
|
|
for row in cur.fetchall():
|
|
license_dict = dict(zip(columns, row))
|
|
licenses.append(license_dict)
|
|
return licenses
|
|
except Exception as e:
|
|
logger.error(f"Error fetching licenses: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_license_by_id(license_id):
|
|
"""Get a specific license by ID"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT l.*, c.name as customer_name,
|
|
(SELECT COUNT(*) FROM license_sessions ls WHERE ls.license_id = l.id) as active_sessions
|
|
FROM licenses l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.id = %s
|
|
""", (license_id,))
|
|
|
|
row = cur.fetchone()
|
|
if row:
|
|
columns = [desc[0] for desc in cur.description]
|
|
return dict(zip(columns, row))
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching license {license_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_license_session_stats(license_id):
|
|
"""Get session statistics for a specific license"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
l.device_limit,
|
|
l.concurrent_sessions_limit,
|
|
(SELECT COUNT(*) FROM device_registrations dr WHERE dr.license_id = l.id AND dr.is_active = true) as registered_devices,
|
|
(SELECT COUNT(*) FROM license_sessions ls WHERE ls.license_id = l.id) as active_sessions,
|
|
l.concurrent_sessions_limit - (SELECT COUNT(*) FROM license_sessions ls WHERE ls.license_id = l.id) as available_sessions
|
|
FROM licenses l
|
|
WHERE l.id = %s
|
|
""", (license_id,))
|
|
|
|
row = cur.fetchone()
|
|
if row:
|
|
return {
|
|
'device_limit': row[0],
|
|
'concurrent_sessions_limit': row[1],
|
|
'registered_devices': row[2],
|
|
'active_sessions': row[3],
|
|
'available_sessions': row[4]
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching session stats for license {license_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_customers(show_fake=False, search=None):
|
|
"""Get all customers from database"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
query = """
|
|
SELECT c.*,
|
|
COUNT(DISTINCT l.id) as license_count,
|
|
COUNT(DISTINCT CASE WHEN l.is_active THEN l.id END) as active_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
"""
|
|
|
|
where_clauses = []
|
|
params = []
|
|
|
|
if not show_fake:
|
|
where_clauses.append("c.is_fake = false")
|
|
|
|
if search:
|
|
where_clauses.append("(LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s))")
|
|
search_pattern = f'%{search}%'
|
|
params.extend([search_pattern, search_pattern])
|
|
|
|
if where_clauses:
|
|
query += " WHERE " + " AND ".join(where_clauses)
|
|
|
|
query += " GROUP BY c.id ORDER BY c.name"
|
|
|
|
cur.execute(query, params)
|
|
|
|
columns = [desc[0] for desc in cur.description]
|
|
customers = []
|
|
for row in cur.fetchall():
|
|
customer_dict = dict(zip(columns, row))
|
|
customers.append(customer_dict)
|
|
return customers
|
|
except Exception as e:
|
|
logger.error(f"Error fetching customers: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_customer_by_id(customer_id):
|
|
"""Get a specific customer by ID"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT c.*,
|
|
COUNT(DISTINCT l.id) as license_count,
|
|
COUNT(DISTINCT CASE WHEN l.is_active THEN l.id END) as active_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
WHERE c.id = %s
|
|
GROUP BY c.id
|
|
""", (customer_id,))
|
|
|
|
row = cur.fetchone()
|
|
if row:
|
|
columns = [desc[0] for desc in cur.description]
|
|
return dict(zip(columns, row))
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching customer {customer_id}: {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_active_sessions():
|
|
"""Get all is_active sessions"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT s.*, l.license_key, c.name as customer_name
|
|
FROM sessions s
|
|
JOIN licenses l ON s.license_id = l.id
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
WHERE s.is_active = true
|
|
ORDER BY s.started_at DESC
|
|
""")
|
|
|
|
columns = [desc[0] for desc in cur.description]
|
|
sessions = []
|
|
for row in cur.fetchall():
|
|
session_dict = dict(zip(columns, row))
|
|
sessions.append(session_dict)
|
|
return sessions
|
|
except Exception as e:
|
|
logger.error(f"Error fetching is_active sessions: {str(e)}")
|
|
return []
|
|
|
|
|
|
def get_devices_for_license(license_id):
|
|
"""Get all registered devices for a specific license"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
id,
|
|
hardware_fingerprint,
|
|
device_name,
|
|
device_type,
|
|
operating_system,
|
|
app_version,
|
|
first_activated_at,
|
|
last_seen_at,
|
|
is_active,
|
|
ip_address,
|
|
(SELECT COUNT(*) FROM license_sessions ls
|
|
WHERE ls.device_registration_id = dr.id) as active_sessions
|
|
FROM device_registrations dr
|
|
WHERE dr.license_id = %s
|
|
ORDER BY dr.last_seen_at DESC
|
|
""", (license_id,))
|
|
|
|
columns = [desc[0] for desc in cur.description]
|
|
devices = []
|
|
for row in cur.fetchall():
|
|
device_dict = dict(zip(columns, row))
|
|
devices.append(device_dict)
|
|
return devices
|
|
except Exception as e:
|
|
logger.error(f"Error fetching devices for license {license_id}: {str(e)}")
|
|
return []
|
|
|
|
|
|
def check_device_limit(license_id):
|
|
"""Check if license has reached its device limit"""
|
|
try:
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
l.device_limit,
|
|
COUNT(dr.id) as active_devices
|
|
FROM licenses l
|
|
LEFT JOIN device_registrations dr ON l.id = dr.license_id AND dr.is_active = true
|
|
WHERE l.id = %s
|
|
GROUP BY l.device_limit
|
|
""", (license_id,))
|
|
|
|
row = cur.fetchone()
|
|
if row:
|
|
return {
|
|
'device_limit': row[0],
|
|
'active_devices': row[1],
|
|
'limit_reached': row[1] >= row[0]
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error checking device limit for license {license_id}: {str(e)}")
|
|
return None |