Dieser Commit ist enthalten in:
2025-06-07 16:33:05 +02:00
Ursprung 2743f3ff9b
Commit b3dc80a38a
15 geänderte Dateien mit 877 neuen und 4 gelöschten Zeilen

125
v2_testing/test_audit_json.py Normale Datei
Datei anzeigen

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
import requests
import urllib3
import subprocess
import json
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://localhost:443"
admin_user = {"username": "rac00n", "password": "1248163264"}
def login(session):
"""Login to admin panel"""
login_data = {
"username": admin_user["username"],
"password": admin_user["password"]
}
response = session.post(f"{base_url}/login", data=login_data, verify=False, allow_redirects=False)
return response.status_code == 302
def test_json_logging():
"""Test JSON value logging in audit log"""
session = requests.Session()
login(session)
print("Testing JSON Value Storage in Audit Log")
print("=" * 50)
# 1. Create a license (should log new_values as JSON)
print("\n1. Creating license to test JSON logging...")
license_data = {
"customer_name": "JSON Test GmbH",
"email": "json@test.de",
"license_key": "JSON-TEST-KEY",
"license_type": "full",
"valid_from": "2025-01-01",
"valid_until": "2025-12-31"
}
response = session.post(f"{base_url}/create", data=license_data, verify=False, allow_redirects=False)
print("✓ License created")
# 2. Get the license ID
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", "SELECT id FROM licenses WHERE license_key = 'JSON-TEST-KEY';"
], capture_output=True, text=True)
license_id = result.stdout.strip()
if license_id:
# 3. Edit the license (should log both old_values and new_values)
print("\n2. Editing license to test old/new JSON values...")
# First get the edit page to ensure we have the right form
response = session.get(f"{base_url}/license/edit/{license_id}", verify=False)
# Now update
updated_data = {
"license_key": "JSON-TEST-UPDATED",
"license_type": "test",
"valid_from": "2025-01-01",
"valid_until": "2025-06-30",
"is_active": "on"
}
response = session.post(f"{base_url}/license/edit/{license_id}",
data=updated_data,
verify=False,
allow_redirects=False)
print("✓ License updated")
# 4. Check the audit log for JSON values
print("\n3. Checking audit log for JSON values...")
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", """SELECT action, entity_type,
CASE WHEN old_values IS NULL THEN 'NULL'
ELSE jsonb_pretty(old_values) END as old_vals,
CASE WHEN new_values IS NULL THEN 'NULL'
ELSE jsonb_pretty(new_values) END as new_vals
FROM audit_log
WHERE entity_type IN ('license', 'customer')
AND (old_values IS NOT NULL OR new_values IS NOT NULL)
ORDER BY timestamp DESC
LIMIT 5;"""
], capture_output=True, text=True)
print(result.stdout)
# 5. Test specific JSON queries
print("\n4. Testing JSON queries...")
# Query for specific license key in new_values
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", """SELECT COUNT(*)
FROM audit_log
WHERE new_values->>'license_key' LIKE 'JSON%';"""
], capture_output=True, text=True)
count = int(result.stdout.strip())
if count > 0:
print(f"✓ Found {count} entries with JSON license keys")
# Query for updates (where both old and new values exist)
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", """SELECT COUNT(*)
FROM audit_log
WHERE old_values IS NOT NULL
AND new_values IS NOT NULL;"""
], capture_output=True, text=True)
update_count = int(result.stdout.strip())
print(f"✓ Found {update_count} UPDATE entries with both old and new values")
# 6. Clean up test data
print("\n5. Cleaning up test data...")
subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", "DELETE FROM licenses WHERE license_key LIKE 'JSON%';"
], capture_output=True)
print("✓ Test data cleaned up")
# Run the test
test_json_logging()

238
v2_testing/test_audit_log.py Normale Datei
Datei anzeigen

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
import requests
import urllib3
import subprocess
import time
import json
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://localhost:443"
admin_user = {"username": "rac00n", "password": "1248163264"}
def login(session):
"""Login to admin panel"""
login_data = {
"username": admin_user["username"],
"password": admin_user["password"]
}
response = session.post(f"{base_url}/login", data=login_data, verify=False, allow_redirects=False)
return response.status_code == 302
def test_audit_table():
"""Test if audit_log table exists"""
print("1. Checking Audit Log Table:")
print("-" * 40)
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", "\\d audit_log"
], capture_output=True, text=True)
if "Table \"public.audit_log\"" in result.stdout:
print("✓ Audit log table exists")
print("\nTable structure:")
print(result.stdout)
return True
else:
print("✗ Audit log table not found - creating it")
# Create table
subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-f", "/docker-entrypoint-initdb.d/init.sql"
], capture_output=True)
return False
def test_audit_logging():
"""Test various actions to generate audit logs"""
session = requests.Session()
print("\n2. Testing Audit Log Generation:")
print("-" * 40)
# Test 1: Login
print("Testing LOGIN audit...")
login(session)
print("✓ Login performed")
# Test 2: Create license
print("\nTesting CREATE audit...")
license_data = {
"customer_name": "Audit Test GmbH",
"email": "audit@test.de",
"license_key": "AUDIT-TEST-001",
"license_type": "test",
"valid_from": "2025-01-01",
"valid_until": "2025-12-31"
}
response = session.post(f"{base_url}/create", data=license_data, verify=False, allow_redirects=False)
if response.status_code == 302:
print("✓ License created")
# Test 3: Export
print("\nTesting EXPORT audit...")
response = session.get(f"{base_url}/export/licenses?format=csv", verify=False)
if response.status_code == 200:
print("✓ Export performed")
# Test 4: Logout
print("\nTesting LOGOUT audit...")
response = session.get(f"{base_url}/logout", verify=False, allow_redirects=False)
if response.status_code == 302:
print("✓ Logout performed")
# Wait for logs to be written
time.sleep(1)
def test_audit_page():
"""Test the audit log page"""
session = requests.Session()
login(session)
print("\n3. Testing Audit Log Page:")
print("-" * 40)
response = session.get(f"{base_url}/audit", verify=False)
if response.status_code == 200:
print("✓ Audit log page accessible")
content = response.text
# Check for expected elements
checks = [
("Audit-Log", "Page title"),
("Zeitstempel", "Timestamp column"),
("Benutzer", "User column"),
("Aktion", "Action column"),
("Entität", "Entity column"),
("IP-Adresse", "IP address column"),
("LOGIN", "Login action"),
("LOGOUT", "Logout action"),
("CREATE", "Create action"),
("EXPORT", "Export action")
]
found = 0
for check_text, description in checks:
if check_text in content:
found += 1
print(f"✓ Found {found}/{len(checks)} expected elements")
# Check filters
if '<select' in content and 'filter' in content.lower():
print("✓ Filter options available")
else:
print(f"✗ Failed to access audit log page: Status {response.status_code}")
def check_audit_logs_db():
"""Check audit logs directly in database"""
print("\n4. Database Audit Log Check:")
print("-" * 40)
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", """SELECT action, entity_type, username,
CASE WHEN old_values IS NULL THEN 'NULL' ELSE 'JSON' END as old_v,
CASE WHEN new_values IS NULL THEN 'NULL' ELSE 'JSON' END as new_v,
additional_info
FROM audit_log
ORDER BY timestamp DESC
LIMIT 10;"""
], capture_output=True, text=True)
print(result.stdout)
# Count total logs
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", "SELECT COUNT(*) FROM audit_log;"
], capture_output=True, text=True)
count = int(result.stdout.strip())
print(f"\nTotal audit log entries: {count}")
def test_audit_filters():
"""Test audit log filters"""
session = requests.Session()
login(session)
print("\n5. Testing Audit Log Filters:")
print("-" * 40)
# Test different filters
filters = [
("?action=LOGIN", "Filter by LOGIN action"),
("?user=rac00n", "Filter by username"),
("?entity=license", "Filter by entity type"),
("?page=2", "Pagination test")
]
for filter_param, description in filters:
response = session.get(f"{base_url}/audit{filter_param}", verify=False)
if response.status_code == 200:
print(f"{description}: Working")
else:
print(f"{description}: Failed")
def test_json_values():
"""Test JSON storage of old/new values"""
print("\n6. Testing JSON Value Storage:")
print("-" * 40)
# Get a recent update log with JSON values
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", """SELECT new_values::text
FROM audit_log
WHERE action = 'CREATE'
AND new_values IS NOT NULL
LIMIT 1;"""
], capture_output=True, text=True)
if result.stdout.strip():
try:
json_data = json.loads(result.stdout.strip())
print("✓ JSON values stored correctly")
print(f"Sample JSON keys: {list(json_data.keys())}")
except:
print("✗ Invalid JSON format")
else:
print("✗ No JSON values found")
# Main execution
print("Testing Audit Log Functionality")
print("=" * 50)
# Ensure table exists
if not test_audit_table():
print("\nRestarting containers to create audit_log table...")
subprocess.run(["docker-compose", "down"], capture_output=True)
subprocess.run(["docker-compose", "up", "-d"], capture_output=True)
subprocess.run(["sleep", "7"], capture_output=True)
test_audit_table()
# Generate test data
test_audit_logging()
# Check results
test_audit_page()
check_audit_logs_db()
test_audit_filters()
test_json_values()
print("\n" + "=" * 50)
print("Audit Log Summary:")
print("-" * 40)
# Summary statistics
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", """SELECT action, COUNT(*) as count
FROM audit_log
GROUP BY action
ORDER BY count DESC;"""
], capture_output=True, text=True)
print(result.stdout)