FIX 3 für Codex

Dieser Commit ist enthalten in:
2025-06-18 01:35:54 +02:00
Ursprung a9cfecc699
Commit 231aa4caed
30 geänderte Dateien mit 573 neuen und 213 gelöschten Zeilen

Datei anzeigen

@@ -21,11 +21,73 @@ license_bp = Blueprint('licenses', __name__)
@login_required
def licenses():
from datetime import datetime, timedelta
show_test = request.args.get('show_test', 'false') == 'true'
# Get filter parameters
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
page = request.args.get('page', 1, type=int)
per_page = 50
# Process type filter to determine show_test
show_test = filter_type in ['test_data', 'test']
# Get licenses based on filters
licenses_list = get_licenses(show_test=show_test)
# Additional filtering based on type and status
if filter_type:
if filter_type == 'full':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'full' and not l.get('is_test')]
elif filter_type == 'test':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'test' and not l.get('is_test')]
elif filter_type == 'test_data':
licenses_list = [l for l in licenses_list if l.get('is_test')]
elif filter_type == 'live_data':
licenses_list = [l for l in licenses_list if not l.get('is_test')]
# Status filtering
if filter_status:
now = datetime.now()
if filter_status == 'active':
licenses_list = [l for l in licenses_list if l.get('is_active') and l.get('valid_until') and l.get('valid_until') > now]
elif filter_status == 'expiring':
expiry_threshold = now + timedelta(days=30)
licenses_list = [l for l in licenses_list if l.get('valid_until') and now < l.get('valid_until') <= expiry_threshold]
elif filter_status == 'expired':
licenses_list = [l for l in licenses_list if l.get('valid_until') and l.get('valid_until') <= now]
elif filter_status == 'inactive':
licenses_list = [l for l in licenses_list if not l.get('is_active')]
# Search filtering
if search:
search_lower = search.lower()
licenses_list = [l for l in licenses_list if
search_lower in str(l.get('license_key', '')).lower() or
search_lower in str(l.get('customer_name', '')).lower() or
search_lower in str(l.get('customer_email', '')).lower()]
# Calculate pagination
total = len(licenses_list)
total_pages = (total + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
licenses_list = licenses_list[start:end]
return render_template("licenses.html",
licenses=licenses_list,
show_test=show_test,
search=search,
filter_type=filter_type,
filter_status=filter_status,
sort=sort,
order=order,
page=page,
total=total,
total_pages=total_pages,
per_page=per_page,
now=datetime.now,
timedelta=timedelta)

Datei anzeigen

@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from flask import Blueprint, render_template, request, redirect, session, url_for, flash, jsonify
from flask import Blueprint, render_template, request, redirect, session, url_for, flash, jsonify, send_file
import config
from auth.decorators import login_required
@@ -39,10 +39,11 @@ def resources():
rp.created_at,
rp.status_changed_at,
rp.status_changed_by,
l.customer_name,
c.name as customer_name,
l.license_type
FROM resource_pools rp
LEFT JOIN licenses l ON rp.allocated_to_license = l.id
LEFT JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
@@ -58,7 +59,7 @@ def resources():
params.append(status_filter)
if search_query:
query += " AND (rp.resource_value ILIKE %s OR l.customer_name ILIKE %s)"
query += " AND (rp.resource_value ILIKE %s OR c.name ILIKE %s)"
params.extend([f'%{search_query}%', f'%{search_query}%'])
if not show_test:
@@ -103,8 +104,9 @@ def resources():
count = row[3]
if res_type not in stats:
stats[res_type] = {'available': 0, 'allocated': 0, 'quarantined': 0, 'test': 0, 'prod': 0}
stats[res_type] = {'total': 0, 'available': 0, 'allocated': 0, 'quarantined': 0, 'test': 0, 'prod': 0}
stats[res_type]['total'] += count
stats[res_type][status] = stats[res_type].get(status, 0) + count
if is_test:
stats[res_type]['test'] += count
@@ -128,60 +130,7 @@ def resources():
conn.close()
@resource_bp.route('/resources/add', methods=['GET', 'POST'])
@login_required
def add_resource():
"""Neue Ressource hinzufügen"""
if request.method == 'POST':
try:
with get_db_connection() as conn:
cur = conn.cursor()
try:
resource_type = request.form['resource_type']
resource_value = request.form['resource_value'].strip()
is_test = 'is_test' in request.form
# Prüfe ob Ressource bereits existiert
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = %s AND resource_value = %s
""", (resource_type, resource_value))
if cur.fetchone():
flash(f'Ressource {resource_value} existiert bereits!', 'error')
return redirect(url_for('resources.add_resource'))
# Füge neue Ressource hinzu (ohne created_by)
cur.execute("""
INSERT INTO resource_pools (resource_type, resource_value, status, is_test, status_changed_by)
VALUES (%s, %s, 'available', %s, %s)
RETURNING id
""", (resource_type, resource_value, is_test, session.get('username', 'system')))
resource_id = cur.fetchone()[0]
conn.commit()
# Audit-Log
log_audit('CREATE', 'resource', resource_id,
new_values={
'resource_type': resource_type,
'resource_value': resource_value,
'is_test': is_test
})
flash(f'Ressource {resource_value} erfolgreich hinzugefügt!', 'success')
return redirect(url_for('resources.resources'))
finally:
cur.close()
except Exception as e:
import traceback
logging.error(f"Fehler beim Hinzufügen der Ressource: {str(e)}")
logging.error(f"Traceback: {traceback.format_exc()}")
flash(f'Fehler: {str(e)}', 'error')
return redirect(url_for('resources.resources'))
return render_template('add_resources.html')
# Old add_resource function removed - using add_resources instead
@resource_bp.route('/resources/quarantine/<int:resource_id>', methods=['POST'])
@@ -615,4 +564,108 @@ def resource_report():
return redirect(url_for('resources.resources'))
finally:
cur.close()
conn.close()
conn.close()
@resource_bp.route('/resources/add', methods=['GET', 'POST'])
@login_required
def add_resources():
"""Fügt neue Ressourcen zum Pool hinzu"""
if request.method == 'POST':
conn = get_connection()
cur = conn.cursor()
try:
resource_type = request.form.get('resource_type')
resources_text = request.form.get('resources_text', '')
is_test = request.form.get('is_test', 'false') == 'true'
if not resource_type or not resources_text.strip():
flash('Bitte Ressourcentyp und Ressourcen angeben!', 'error')
return redirect(url_for('resources.add_resources'))
# Parse resources (one per line)
resources = [r.strip() for r in resources_text.strip().split('\n') if r.strip()]
# Validate resources based on type
valid_resources = []
invalid_resources = []
for resource in resources:
if resource_type == 'domain':
# Basic domain validation
import re
if re.match(r'^[a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]?\.[a-zA-Z]{2,}$', resource):
valid_resources.append(resource)
else:
invalid_resources.append(resource)
elif resource_type == 'ipv4':
# IPv4 validation
parts = resource.split('.')
if len(parts) == 4 and all(p.isdigit() and 0 <= int(p) <= 255 for p in parts):
valid_resources.append(resource)
else:
invalid_resources.append(resource)
elif resource_type == 'phone':
# Phone number validation (basic)
import re
if re.match(r'^\+?[0-9]{7,15}$', resource.replace(' ', '').replace('-', '')):
valid_resources.append(resource)
else:
invalid_resources.append(resource)
else:
invalid_resources.append(resource)
# Check for duplicates
existing_resources = []
if valid_resources:
placeholders = ','.join(['%s'] * len(valid_resources))
cur.execute(f"""
SELECT resource_value
FROM resource_pools
WHERE resource_type = %s
AND resource_value IN ({placeholders})
""", [resource_type] + valid_resources)
existing_resources = [row[0] for row in cur.fetchall()]
# Filter out existing resources
new_resources = [r for r in valid_resources if r not in existing_resources]
# Insert new resources
added_count = 0
for resource in new_resources:
cur.execute("""
INSERT INTO resource_pools
(resource_type, resource_value, status, is_test, created_by)
VALUES (%s, %s, 'available', %s, %s)
""", (resource_type, resource, is_test, session['username']))
added_count += 1
conn.commit()
# Log audit
if added_count > 0:
log_audit('BULK_CREATE', 'resource',
additional_info=f"Added {added_count} {resource_type} resources")
# Flash messages
if added_count > 0:
flash(f'{added_count} neue Ressourcen erfolgreich hinzugefügt!', 'success')
if existing_resources:
flash(f'⚠️ {len(existing_resources)} Ressourcen existierten bereits und wurden übersprungen.', 'warning')
if invalid_resources:
flash(f'{len(invalid_resources)} ungültige Ressourcen wurden ignoriert.', 'error')
return redirect(url_for('resources.resources', show_test=request.form.get('show_test', 'false')))
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Hinzufügen von Ressourcen: {str(e)}")
flash('Fehler beim Hinzufügen der Ressourcen!', 'error')
finally:
cur.close()
conn.close()
# GET request - show form
show_test = request.args.get('show_test', 'false') == 'true'
return render_template('add_resources.html', show_test=show_test)