Files
v2-Docker/v2_testing/test_audit_log.py
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

238 Zeilen
7.4 KiB
Python

#!/usr/bin/env python3
import requests
import urllib3
import subprocess
import time
import json
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://localhost:443"
admin_user = {"username": "rac00n", "password": "1248163264"}
def login(session):
"""Login to admin panel"""
login_data = {
"username": admin_user["username"],
"password": admin_user["password"]
}
response = session.post(f"{base_url}/login", data=login_data, verify=False, allow_redirects=False)
return response.status_code == 302
def test_audit_table():
"""Test if audit_log table exists"""
print("1. Checking Audit Log Table:")
print("-" * 40)
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", "\\d audit_log"
], capture_output=True, text=True)
if "Table \"public.audit_log\"" in result.stdout:
print("✓ Audit log table exists")
print("\nTable structure:")
print(result.stdout)
return True
else:
print("✗ Audit log table not found - creating it")
# Create table
subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-f", "/docker-entrypoint-initdb.d/init.sql"
], capture_output=True)
return False
def test_audit_logging():
"""Test various actions to generate audit logs"""
session = requests.Session()
print("\n2. Testing Audit Log Generation:")
print("-" * 40)
# Test 1: Login
print("Testing LOGIN audit...")
login(session)
print("✓ Login performed")
# Test 2: Create license
print("\nTesting CREATE audit...")
license_data = {
"customer_name": "Audit Test GmbH",
"email": "audit@test.de",
"license_key": "AUDIT-TEST-001",
"license_type": "test",
"valid_from": "2025-01-01",
"valid_until": "2025-12-31"
}
response = session.post(f"{base_url}/create", data=license_data, verify=False, allow_redirects=False)
if response.status_code == 302:
print("✓ License created")
# Test 3: Export
print("\nTesting EXPORT audit...")
response = session.get(f"{base_url}/export/licenses?format=csv", verify=False)
if response.status_code == 200:
print("✓ Export performed")
# Test 4: Logout
print("\nTesting LOGOUT audit...")
response = session.get(f"{base_url}/logout", verify=False, allow_redirects=False)
if response.status_code == 302:
print("✓ Logout performed")
# Wait for logs to be written
time.sleep(1)
def test_audit_page():
"""Test the audit log page"""
session = requests.Session()
login(session)
print("\n3. Testing Audit Log Page:")
print("-" * 40)
response = session.get(f"{base_url}/audit", verify=False)
if response.status_code == 200:
print("✓ Audit log page accessible")
content = response.text
# Check for expected elements
checks = [
("Audit-Log", "Page title"),
("Zeitstempel", "Timestamp column"),
("Benutzer", "User column"),
("Aktion", "Action column"),
("Entität", "Entity column"),
("IP-Adresse", "IP address column"),
("LOGIN", "Login action"),
("LOGOUT", "Logout action"),
("CREATE", "Create action"),
("EXPORT", "Export action")
]
found = 0
for check_text, description in checks:
if check_text in content:
found += 1
print(f"✓ Found {found}/{len(checks)} expected elements")
# Check filters
if '<select' in content and 'filter' in content.lower():
print("✓ Filter options available")
else:
print(f"✗ Failed to access audit log page: Status {response.status_code}")
def check_audit_logs_db():
"""Check audit logs directly in database"""
print("\n4. Database Audit Log Check:")
print("-" * 40)
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", """SELECT action, entity_type, username,
CASE WHEN old_values IS NULL THEN 'NULL' ELSE 'JSON' END as old_v,
CASE WHEN new_values IS NULL THEN 'NULL' ELSE 'JSON' END as new_v,
additional_info
FROM audit_log
ORDER BY timestamp DESC
LIMIT 10;"""
], capture_output=True, text=True)
print(result.stdout)
# Count total logs
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", "SELECT COUNT(*) FROM audit_log;"
], capture_output=True, text=True)
count = int(result.stdout.strip())
print(f"\nTotal audit log entries: {count}")
def test_audit_filters():
"""Test audit log filters"""
session = requests.Session()
login(session)
print("\n5. Testing Audit Log Filters:")
print("-" * 40)
# Test different filters
filters = [
("?action=LOGIN", "Filter by LOGIN action"),
("?user=rac00n", "Filter by username"),
("?entity=license", "Filter by entity type"),
("?page=2", "Pagination test")
]
for filter_param, description in filters:
response = session.get(f"{base_url}/audit{filter_param}", verify=False)
if response.status_code == 200:
print(f"{description}: Working")
else:
print(f"{description}: Failed")
def test_json_values():
"""Test JSON storage of old/new values"""
print("\n6. Testing JSON Value Storage:")
print("-" * 40)
# Get a recent update log with JSON values
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
"-c", """SELECT new_values::text
FROM audit_log
WHERE action = 'CREATE'
AND new_values IS NOT NULL
LIMIT 1;"""
], capture_output=True, text=True)
if result.stdout.strip():
try:
json_data = json.loads(result.stdout.strip())
print("✓ JSON values stored correctly")
print(f"Sample JSON keys: {list(json_data.keys())}")
except:
print("✗ Invalid JSON format")
else:
print("✗ No JSON values found")
# Main execution
print("Testing Audit Log Functionality")
print("=" * 50)
# Ensure table exists
if not test_audit_table():
print("\nRestarting containers to create audit_log table...")
subprocess.run(["docker-compose", "down"], capture_output=True)
subprocess.run(["docker-compose", "up", "-d"], capture_output=True)
subprocess.run(["sleep", "7"], capture_output=True)
test_audit_table()
# Generate test data
test_audit_logging()
# Check results
test_audit_page()
check_audit_logs_db()
test_audit_filters()
test_json_values()
print("\n" + "=" * 50)
print("Audit Log Summary:")
print("-" * 40)
# Summary statistics
result = subprocess.run([
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
"-c", """SELECT action, COUNT(*) as count
FROM audit_log
GROUP BY action
ORDER BY count DESC;"""
], capture_output=True, text=True)
print(result.stdout)