238 Zeilen
7.4 KiB
Python
238 Zeilen
7.4 KiB
Python
#!/usr/bin/env python3
|
|
import requests
|
|
import urllib3
|
|
import subprocess
|
|
import time
|
|
import json
|
|
|
|
# Disable SSL warnings
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
base_url = "https://localhost:443"
|
|
admin_user = {"username": "rac00n", "password": "1248163264"}
|
|
|
|
def login(session):
|
|
"""Login to admin panel"""
|
|
login_data = {
|
|
"username": admin_user["username"],
|
|
"password": admin_user["password"]
|
|
}
|
|
response = session.post(f"{base_url}/login", data=login_data, verify=False, allow_redirects=False)
|
|
return response.status_code == 302
|
|
|
|
def test_audit_table():
|
|
"""Test if audit_log table exists"""
|
|
print("1. Checking Audit Log Table:")
|
|
print("-" * 40)
|
|
|
|
result = subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
|
|
"-c", "\\d audit_log"
|
|
], capture_output=True, text=True)
|
|
|
|
if "Table \"public.audit_log\"" in result.stdout:
|
|
print("✓ Audit log table exists")
|
|
print("\nTable structure:")
|
|
print(result.stdout)
|
|
return True
|
|
else:
|
|
print("✗ Audit log table not found - creating it")
|
|
# Create table
|
|
subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
|
|
"-f", "/docker-entrypoint-initdb.d/init.sql"
|
|
], capture_output=True)
|
|
return False
|
|
|
|
def test_audit_logging():
|
|
"""Test various actions to generate audit logs"""
|
|
session = requests.Session()
|
|
|
|
print("\n2. Testing Audit Log Generation:")
|
|
print("-" * 40)
|
|
|
|
# Test 1: Login
|
|
print("Testing LOGIN audit...")
|
|
login(session)
|
|
print("✓ Login performed")
|
|
|
|
# Test 2: Create license
|
|
print("\nTesting CREATE audit...")
|
|
license_data = {
|
|
"customer_name": "Audit Test GmbH",
|
|
"email": "audit@test.de",
|
|
"license_key": "AUDIT-TEST-001",
|
|
"license_type": "test",
|
|
"valid_from": "2025-01-01",
|
|
"valid_until": "2025-12-31"
|
|
}
|
|
response = session.post(f"{base_url}/create", data=license_data, verify=False, allow_redirects=False)
|
|
if response.status_code == 302:
|
|
print("✓ License created")
|
|
|
|
# Test 3: Export
|
|
print("\nTesting EXPORT audit...")
|
|
response = session.get(f"{base_url}/export/licenses?format=csv", verify=False)
|
|
if response.status_code == 200:
|
|
print("✓ Export performed")
|
|
|
|
# Test 4: Logout
|
|
print("\nTesting LOGOUT audit...")
|
|
response = session.get(f"{base_url}/logout", verify=False, allow_redirects=False)
|
|
if response.status_code == 302:
|
|
print("✓ Logout performed")
|
|
|
|
# Wait for logs to be written
|
|
time.sleep(1)
|
|
|
|
def test_audit_page():
|
|
"""Test the audit log page"""
|
|
session = requests.Session()
|
|
login(session)
|
|
|
|
print("\n3. Testing Audit Log Page:")
|
|
print("-" * 40)
|
|
|
|
response = session.get(f"{base_url}/audit", verify=False)
|
|
|
|
if response.status_code == 200:
|
|
print("✓ Audit log page accessible")
|
|
|
|
content = response.text
|
|
|
|
# Check for expected elements
|
|
checks = [
|
|
("Audit-Log", "Page title"),
|
|
("Zeitstempel", "Timestamp column"),
|
|
("Benutzer", "User column"),
|
|
("Aktion", "Action column"),
|
|
("Entität", "Entity column"),
|
|
("IP-Adresse", "IP address column"),
|
|
("LOGIN", "Login action"),
|
|
("LOGOUT", "Logout action"),
|
|
("CREATE", "Create action"),
|
|
("EXPORT", "Export action")
|
|
]
|
|
|
|
found = 0
|
|
for check_text, description in checks:
|
|
if check_text in content:
|
|
found += 1
|
|
|
|
print(f"✓ Found {found}/{len(checks)} expected elements")
|
|
|
|
# Check filters
|
|
if '<select' in content and 'filter' in content.lower():
|
|
print("✓ Filter options available")
|
|
else:
|
|
print(f"✗ Failed to access audit log page: Status {response.status_code}")
|
|
|
|
def check_audit_logs_db():
|
|
"""Check audit logs directly in database"""
|
|
print("\n4. Database Audit Log Check:")
|
|
print("-" * 40)
|
|
|
|
result = subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
|
|
"-c", """SELECT action, entity_type, username,
|
|
CASE WHEN old_values IS NULL THEN 'NULL' ELSE 'JSON' END as old_v,
|
|
CASE WHEN new_values IS NULL THEN 'NULL' ELSE 'JSON' END as new_v,
|
|
additional_info
|
|
FROM audit_log
|
|
ORDER BY timestamp DESC
|
|
LIMIT 10;"""
|
|
], capture_output=True, text=True)
|
|
|
|
print(result.stdout)
|
|
|
|
# Count total logs
|
|
result = subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
|
|
"-c", "SELECT COUNT(*) FROM audit_log;"
|
|
], capture_output=True, text=True)
|
|
|
|
count = int(result.stdout.strip())
|
|
print(f"\nTotal audit log entries: {count}")
|
|
|
|
def test_audit_filters():
|
|
"""Test audit log filters"""
|
|
session = requests.Session()
|
|
login(session)
|
|
|
|
print("\n5. Testing Audit Log Filters:")
|
|
print("-" * 40)
|
|
|
|
# Test different filters
|
|
filters = [
|
|
("?action=LOGIN", "Filter by LOGIN action"),
|
|
("?user=rac00n", "Filter by username"),
|
|
("?entity=license", "Filter by entity type"),
|
|
("?page=2", "Pagination test")
|
|
]
|
|
|
|
for filter_param, description in filters:
|
|
response = session.get(f"{base_url}/audit{filter_param}", verify=False)
|
|
if response.status_code == 200:
|
|
print(f"✓ {description}: Working")
|
|
else:
|
|
print(f"✗ {description}: Failed")
|
|
|
|
def test_json_values():
|
|
"""Test JSON storage of old/new values"""
|
|
print("\n6. Testing JSON Value Storage:")
|
|
print("-" * 40)
|
|
|
|
# Get a recent update log with JSON values
|
|
result = subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank", "-t",
|
|
"-c", """SELECT new_values::text
|
|
FROM audit_log
|
|
WHERE action = 'CREATE'
|
|
AND new_values IS NOT NULL
|
|
LIMIT 1;"""
|
|
], capture_output=True, text=True)
|
|
|
|
if result.stdout.strip():
|
|
try:
|
|
json_data = json.loads(result.stdout.strip())
|
|
print("✓ JSON values stored correctly")
|
|
print(f"Sample JSON keys: {list(json_data.keys())}")
|
|
except:
|
|
print("✗ Invalid JSON format")
|
|
else:
|
|
print("✗ No JSON values found")
|
|
|
|
# Main execution
|
|
print("Testing Audit Log Functionality")
|
|
print("=" * 50)
|
|
|
|
# Ensure table exists
|
|
if not test_audit_table():
|
|
print("\nRestarting containers to create audit_log table...")
|
|
subprocess.run(["docker-compose", "down"], capture_output=True)
|
|
subprocess.run(["docker-compose", "up", "-d"], capture_output=True)
|
|
subprocess.run(["sleep", "7"], capture_output=True)
|
|
test_audit_table()
|
|
|
|
# Generate test data
|
|
test_audit_logging()
|
|
|
|
# Check results
|
|
test_audit_page()
|
|
check_audit_logs_db()
|
|
test_audit_filters()
|
|
test_json_values()
|
|
|
|
print("\n" + "=" * 50)
|
|
print("Audit Log Summary:")
|
|
print("-" * 40)
|
|
|
|
# Summary statistics
|
|
result = subprocess.run([
|
|
"docker", "exec", "db", "psql", "-U", "adminuser", "-d", "meinedatenbank",
|
|
"-c", """SELECT action, COUNT(*) as count
|
|
FROM audit_log
|
|
GROUP BY action
|
|
ORDER BY count DESC;"""
|
|
], capture_output=True, text=True)
|
|
print(result.stdout) |