Dieser Commit ist enthalten in:
2025-06-21 20:49:08 +02:00
Ursprung 08e4e939ab
Commit 0a994fa648
15 geänderte Dateien mit 2436 neuen und 34 gelöschten Zeilen

Datei anzeigen

@@ -0,0 +1,456 @@
# Error Handling Guide
## Overview
This guide describes the error handling system implemented in the v2_adminpanel application. The system provides:
- Centralized error handling with custom exception hierarchy
- Input validation framework
- Structured logging
- Monitoring and alerting
- Consistent error responses
## Architecture
### 1. Custom Exception Hierarchy
```
BaseApplicationException
├── ValidationException
│ ├── InputValidationError
│ ├── BusinessRuleViolation
│ └── DataIntegrityError
├── AuthenticationException
│ ├── InvalidCredentialsError
│ ├── SessionExpiredError
│ └── InsufficientPermissionsError
├── DatabaseException
│ ├── ConnectionError
│ ├── QueryError
│ └── TransactionError
├── ExternalServiceException
│ ├── APIError
│ └── TimeoutError
└── ResourceException
├── ResourceNotFoundError
├── ResourceConflictError
└── ResourceLimitExceeded
```
### 2. Core Components
- **core/exceptions.py**: Custom exception classes
- **core/error_handlers.py**: Global error handlers and decorators
- **core/validators.py**: Input validation framework
- **core/logging_config.py**: Structured logging setup
- **core/monitoring.py**: Error metrics and alerting
- **middleware/error_middleware.py**: Request-level error handling
## Usage Examples
### 1. Raising Custom Exceptions
```python
from core.exceptions import (
InputValidationError,
ResourceNotFoundError,
BusinessRuleViolation
)
# Validation error
if not email_is_valid:
raise InputValidationError(
field='email',
message='Invalid email format',
value=email_value,
expected_type='email'
)
# Resource not found
user = db.get_user(user_id)
if not user:
raise ResourceNotFoundError(
resource_type='User',
resource_id=user_id
)
# Business rule violation
if active_licenses >= license_limit:
raise BusinessRuleViolation(
rule='license_limit',
message='License limit exceeded',
context={
'current': active_licenses,
'limit': license_limit
}
)
```
### 2. Using Error Decorators
```python
from core.error_handlers import handle_errors, validate_request
from core.validators import validate
@handle_errors(
catch=(psycopg2.Error,),
message='Database operation failed',
user_message='Datenbankfehler aufgetreten',
redirect_to='admin.dashboard'
)
def update_customer(customer_id):
# Database operations
pass
@validate_request(
required_fields={
'email': str,
'age': int,
'active': bool
}
)
def create_user():
# Request data is validated
pass
@validate({
'email': {
'type': 'email',
'required': True
},
'password': {
'type': 'password',
'required': True
},
'age': {
'type': 'integer',
'required': True,
'min_value': 18,
'max_value': 120
}
})
def register_user():
# Access validated data
data = request.validated_data
# Use data safely
```
### 3. Input Validation
```python
from core.validators import Validators
# Email validation
email = Validators.email(user_input, field_name='email')
# Phone validation
phone = Validators.phone(user_input, field_name='phone')
# License key validation
license_key = Validators.license_key(user_input)
# Integer with constraints
age = Validators.integer(
user_input,
field_name='age',
min_value=0,
max_value=150
)
# String with constraints
username = Validators.string(
user_input,
field_name='username',
min_length=3,
max_length=50,
safe_only=True
)
# Password validation
password = Validators.password(user_input)
# Custom enum validation
status = Validators.enum(
user_input,
field_name='status',
allowed_values=['active', 'inactive', 'pending']
)
```
### 4. Error Context Manager
```python
from core.error_handlers import ErrorContext
with ErrorContext(
operation='create_license',
resource_type='License',
resource_id=license_key
):
# Operations that might fail
db.insert_license(license_data)
# Errors are automatically logged with context
```
### 5. Logging
```python
from core.logging_config import get_logger, log_error, log_security_event
logger = get_logger(__name__)
# Standard logging
logger.info('User created', extra={
'user_id': user.id,
'email': user.email
})
# Error logging
try:
risky_operation()
except Exception as e:
log_error(
logger,
'Risky operation failed',
error=e,
user_id=user.id,
operation='risky_operation'
)
# Security event logging
log_security_event(
'INVALID_LOGIN_ATTEMPT',
'Multiple failed login attempts',
username=username,
ip_address=request.remote_addr,
attempt_count=5
)
```
## Error Response Format
### JSON Responses (API)
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid input provided",
"timestamp": "2024-01-15T10:30:00Z",
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"details": {
"field": "email",
"value": "invalid-email",
"expected_type": "email"
}
}
}
```
### HTML Responses
- User-friendly error messages
- Error code and request ID for support
- Helpful suggestions for resolution
- Navigation options (back, dashboard, retry)
## Monitoring and Alerts
### Metrics Exposed
- `app_errors_total`: Total error count by code, status, endpoint
- `app_error_rate`: Errors per minute by error code
- `app_validation_errors_total`: Validation errors by field
- `app_auth_failures_total`: Authentication failures
- `app_database_errors_total`: Database errors
- `app_request_duration_seconds`: Request duration histogram
### Alert Thresholds
- Error rate > 10/min: Critical alert
- Auth failure rate > 5/min: Security alert
- DB error rate > 3/min: Infrastructure alert
- Response time 95th percentile > 2s: Performance alert
### Accessing Metrics
- Prometheus metrics: `/metrics`
- Active alerts: `/api/alerts`
## Best Practices
### 1. Always Use Specific Exceptions
```python
# Bad
raise Exception("User not found")
# Good
raise ResourceNotFoundError('User', user_id)
```
### 2. Provide Context
```python
# Bad
raise ValidationException("Invalid data")
# Good
raise InputValidationError(
field='email',
message='Email domain not allowed',
value=email,
expected_type='corporate_email'
)
```
### 3. Handle Database Errors
```python
# Bad
result = db.execute(query)
# Good
try:
result = db.execute(query)
except psycopg2.IntegrityError as e:
if e.pgcode == '23505':
raise DataIntegrityError(
entity='User',
constraint='unique_email',
message='Email already exists'
)
raise
```
### 4. Validate Early
```python
# Bad
def process_order(data):
# Process without validation
total = data['quantity'] * data['price']
# Good
@validate({
'quantity': {'type': 'integer', 'min_value': 1},
'price': {'type': 'float', 'min_value': 0}
})
def process_order():
data = request.validated_data
total = data['quantity'] * data['price']
```
### 5. Log Security Events
```python
# Failed login attempts
log_security_event(
'LOGIN_FAILURE',
f'Failed login for user {username}',
username=username,
ip_address=request.remote_addr
)
# Suspicious activity
log_security_event(
'SUSPICIOUS_ACTIVITY',
'Rapid API requests detected',
ip_address=request.remote_addr,
request_count=count,
time_window=60
)
```
## Migration Guide
### Converting Existing Error Handling
1. **Replace generic exceptions**:
```python
# Old
except Exception as e:
flash(f"Error: {str(e)}", "error")
return redirect(url_for('admin.dashboard'))
# New
except DatabaseException as e:
# Already handled by global handler
raise
```
2. **Update validation**:
```python
# Old
email = request.form.get('email')
if not email or '@' not in email:
flash("Invalid email", "error")
return redirect(request.url)
# New
from core.validators import Validators
try:
email = Validators.email(request.form.get('email'))
except InputValidationError as e:
# Handled automatically
raise
```
3. **Use decorators**:
```python
# Old
@app.route('/api/user', methods=['POST'])
def create_user():
try:
# validation code
# database operations
except Exception as e:
return jsonify({'error': str(e)}), 500
# New
@app.route('/api/user', methods=['POST'])
@validate({
'email': {'type': 'email', 'required': True},
'name': {'type': 'string', 'required': True, 'min_length': 2}
})
def create_user():
data = request.validated_data
# Use validated data directly
```
## Testing
Run the test suite:
```bash
pytest v2_adminpanel/tests/test_error_handling.py -v
```
Test coverage includes:
- Exception creation and properties
- Error handler responses (JSON/HTML)
- Validation functions
- Decorators
- Monitoring and metrics
- Alert generation
## Troubleshooting
### Common Issues
1. **Import errors**: Ensure you import from `core` package
2. **Validation not working**: Check decorator order (validate must be closest to function)
3. **Logs not appearing**: Verify LOG_LEVEL environment variable
4. **Metrics missing**: Ensure prometheus_client is installed
### Debug Mode
In development, set:
```python
app.config['DEBUG'] = True
```
This will:
- Include detailed error information
- Show stack traces
- Log to console with readable format

Datei anzeigen

@@ -16,6 +16,12 @@ from prometheus_flask_exporter import PrometheusMetrics
import config
from utils.backup import create_backup
# Import error handling system
from core.error_handlers import init_error_handlers
from core.logging_config import setup_logging
from core.monitoring import init_monitoring
from middleware.error_middleware import ErrorHandlingMiddleware
app = Flask(__name__)
# Initialize Prometheus metrics
@@ -41,6 +47,12 @@ app.wsgi_app = ProxyFix(
# Configuration is now loaded from config module
# Initialize error handling system
setup_logging(app)
init_error_handlers(app)
init_monitoring(app)
ErrorHandlingMiddleware(app)
# Scheduler für automatische Backups
scheduler = BackgroundScheduler()
scheduler.start()
@@ -129,24 +141,7 @@ scheduler.add_job(
)
# Error handlers
@app.errorhandler(404)
def not_found(e):
try:
return render_template('404.html'), 404
except:
return "404 - Page not found", 404
@app.errorhandler(500)
def server_error(e):
import traceback
error_msg = f"Server error: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
try:
return render_template('500.html'), 500
except:
return f"500 - Internal Server Error\n\n{error_msg}", 500
# Error handlers are now managed by the error handling system in core/error_handlers.py
# Context processors

Datei anzeigen

Datei anzeigen

@@ -0,0 +1,273 @@
import logging
import traceback
from functools import wraps
from typing import Optional, Dict, Any, Callable, Union
from flask import (
Flask, request, jsonify, render_template, flash, redirect,
url_for, current_app, g
)
from werkzeug.exceptions import HTTPException
import psycopg2
from .exceptions import (
BaseApplicationException, DatabaseException, ValidationException,
AuthenticationException, ResourceException, QueryError,
ConnectionError, TransactionError
)
logger = logging.getLogger(__name__)
def init_error_handlers(app: Flask) -> None:
@app.before_request
def before_request():
g.request_id = request.headers.get('X-Request-ID',
BaseApplicationException('', '', 0).request_id)
@app.errorhandler(BaseApplicationException)
def handle_application_error(error: BaseApplicationException):
return _handle_error(error)
@app.errorhandler(HTTPException)
def handle_http_error(error: HTTPException):
return _handle_error(error)
@app.errorhandler(psycopg2.Error)
def handle_database_error(error: psycopg2.Error):
db_exception = _convert_psycopg2_error(error)
return _handle_error(db_exception)
@app.errorhandler(Exception)
def handle_unexpected_error(error: Exception):
logger.error(
f"Unexpected error: {str(error)}",
exc_info=True,
extra={'request_id': getattr(g, 'request_id', 'unknown')}
)
if current_app.debug:
raise
generic_error = BaseApplicationException(
message="An unexpected error occurred",
code="INTERNAL_ERROR",
status_code=500,
user_message="Ein unerwarteter Fehler ist aufgetreten"
)
return _handle_error(generic_error)
def _handle_error(error: Union[BaseApplicationException, HTTPException, Exception]) -> tuple:
if isinstance(error, HTTPException):
status_code = error.code
error_dict = {
'error': {
'code': error.name.upper().replace(' ', '_'),
'message': error.description or str(error),
'request_id': getattr(g, 'request_id', 'unknown')
}
}
user_message = error.description or str(error)
elif isinstance(error, BaseApplicationException):
status_code = error.status_code
error_dict = error.to_dict(include_details=current_app.debug)
error_dict['error']['request_id'] = getattr(g, 'request_id', error.request_id)
user_message = error.user_message
logger.error(
f"{error.__class__.__name__}: {error.message}",
extra={
'error_code': error.code,
'details': error.details,
'request_id': error_dict['error']['request_id']
}
)
else:
status_code = 500
error_dict = {
'error': {
'code': 'INTERNAL_ERROR',
'message': 'An internal error occurred',
'request_id': getattr(g, 'request_id', 'unknown')
}
}
user_message = "Ein interner Fehler ist aufgetreten"
if _is_json_request():
return jsonify(error_dict), status_code
else:
if status_code == 404:
return render_template('404.html'), 404
elif status_code >= 500:
return render_template('500.html', error=user_message), status_code
else:
flash(user_message, 'error')
return render_template('error.html',
error=user_message,
error_code=error_dict['error']['code'],
request_id=error_dict['error']['request_id']), status_code
def _convert_psycopg2_error(error: psycopg2.Error) -> DatabaseException:
error_code = getattr(error, 'pgcode', None)
error_message = str(error).split('\n')[0]
if isinstance(error, psycopg2.OperationalError):
return ConnectionError(
message=f"Database connection failed: {error_message}",
host=None
)
elif isinstance(error, psycopg2.IntegrityError):
if error_code == '23505':
return ValidationException(
message="Duplicate entry violation",
details={'constraint': error_message},
user_message="Dieser Eintrag existiert bereits"
)
elif error_code == '23503':
return ValidationException(
message="Foreign key violation",
details={'constraint': error_message},
user_message="Referenzierte Daten existieren nicht"
)
else:
return ValidationException(
message="Data integrity violation",
details={'error_code': error_code},
user_message="Datenintegritätsfehler"
)
elif isinstance(error, psycopg2.DataError):
return ValidationException(
message="Invalid data format",
details={'error': error_message},
user_message="Ungültiges Datenformat"
)
else:
return QueryError(
message=error_message,
query="[query hidden for security]",
error_code=error_code
)
def _is_json_request() -> bool:
return (request.is_json or
request.path.startswith('/api/') or
request.accept_mimetypes.best == 'application/json')
def handle_errors(
catch: tuple = (Exception,),
message: str = "Operation failed",
user_message: Optional[str] = None,
redirect_to: Optional[str] = None
) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except catch as e:
if isinstance(e, BaseApplicationException):
raise
logger.error(
f"Error in {func.__name__}: {str(e)}",
exc_info=True,
extra={'request_id': getattr(g, 'request_id', 'unknown')}
)
if _is_json_request():
return jsonify({
'error': {
'code': 'OPERATION_FAILED',
'message': user_message or message,
'request_id': getattr(g, 'request_id', 'unknown')
}
}), 500
else:
flash(user_message or message, 'error')
if redirect_to:
return redirect(url_for(redirect_to))
return redirect(request.referrer or url_for('admin.dashboard'))
return wrapper
return decorator
def validate_request(
required_fields: Optional[Dict[str, type]] = None,
optional_fields: Optional[Dict[str, type]] = None
) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
data = request.get_json() if request.is_json else request.form
if required_fields:
for field, expected_type in required_fields.items():
if field not in data:
raise ValidationException(
message=f"Missing required field: {field}",
field=field,
user_message=f"Pflichtfeld fehlt: {field}"
)
try:
if expected_type != str:
if expected_type == int:
int(data[field])
elif expected_type == float:
float(data[field])
elif expected_type == bool:
if isinstance(data[field], str):
if data[field].lower() not in ['true', 'false', '1', '0']:
raise ValueError
except (ValueError, TypeError):
raise ValidationException(
message=f"Invalid type for field {field}",
field=field,
value=data[field],
details={'expected_type': expected_type.__name__},
user_message=f"Ungültiger Typ für Feld {field}"
)
return func(*args, **kwargs)
return wrapper
return decorator
class ErrorContext:
def __init__(
self,
operation: str,
resource_type: Optional[str] = None,
resource_id: Optional[Any] = None
):
self.operation = operation
self.resource_type = resource_type
self.resource_id = resource_id
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is None:
return False
if isinstance(exc_val, BaseApplicationException):
return False
logger.error(
f"Error during {self.operation}",
exc_info=True,
extra={
'operation': self.operation,
'resource_type': self.resource_type,
'resource_id': self.resource_id,
'request_id': getattr(g, 'request_id', 'unknown')
}
)
return False

Datei anzeigen

@@ -0,0 +1,356 @@
import uuid
from typing import Optional, Dict, Any
from datetime import datetime
class BaseApplicationException(Exception):
def __init__(
self,
message: str,
code: str,
status_code: int = 500,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
super().__init__(message)
self.message = message
self.code = code
self.status_code = status_code
self.details = details or {}
self.user_message = user_message or message
self.timestamp = datetime.utcnow()
self.request_id = str(uuid.uuid4())
def to_dict(self, include_details: bool = False) -> Dict[str, Any]:
result = {
'error': {
'code': self.code,
'message': self.user_message,
'timestamp': self.timestamp.isoformat(),
'request_id': self.request_id
}
}
if include_details and self.details:
result['error']['details'] = self.details
return result
class ValidationException(BaseApplicationException):
def __init__(
self,
message: str,
field: Optional[str] = None,
value: Any = None,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
details = details or {}
if field:
details['field'] = field
if value is not None:
details['value'] = str(value)
super().__init__(
message=message,
code='VALIDATION_ERROR',
status_code=400,
details=details,
user_message=user_message or "Ungültige Eingabe"
)
class InputValidationError(ValidationException):
def __init__(
self,
field: str,
message: str,
value: Any = None,
expected_type: Optional[str] = None
):
details = {'expected_type': expected_type} if expected_type else None
super().__init__(
message=f"Invalid input for field '{field}': {message}",
field=field,
value=value,
details=details,
user_message=f"Ungültiger Wert für Feld '{field}'"
)
class BusinessRuleViolation(ValidationException):
def __init__(
self,
rule: str,
message: str,
context: Optional[Dict[str, Any]] = None
):
super().__init__(
message=message,
details={'rule': rule, 'context': context or {}},
user_message="Geschäftsregel verletzt"
)
class DataIntegrityError(ValidationException):
def __init__(
self,
entity: str,
constraint: str,
message: str,
details: Optional[Dict[str, Any]] = None
):
details = details or {}
details.update({'entity': entity, 'constraint': constraint})
super().__init__(
message=message,
details=details,
user_message="Datenintegritätsfehler"
)
class AuthenticationException(BaseApplicationException):
def __init__(
self,
message: str,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
super().__init__(
message=message,
code='AUTHENTICATION_ERROR',
status_code=401,
details=details,
user_message=user_message or "Authentifizierung fehlgeschlagen"
)
class InvalidCredentialsError(AuthenticationException):
def __init__(self, username: Optional[str] = None):
details = {'username': username} if username else None
super().__init__(
message="Invalid username or password",
details=details,
user_message="Ungültiger Benutzername oder Passwort"
)
class SessionExpiredError(AuthenticationException):
def __init__(self, session_id: Optional[str] = None):
details = {'session_id': session_id} if session_id else None
super().__init__(
message="Session has expired",
details=details,
user_message="Ihre Sitzung ist abgelaufen"
)
class InsufficientPermissionsError(AuthenticationException):
def __init__(
self,
required_permission: str,
user_permissions: Optional[list] = None
):
super().__init__(
message=f"User lacks required permission: {required_permission}",
details={
'required': required_permission,
'user_permissions': user_permissions or []
},
user_message="Unzureichende Berechtigungen für diese Aktion"
)
self.status_code = 403
class DatabaseException(BaseApplicationException):
def __init__(
self,
message: str,
query: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
details = details or {}
if query:
details['query_hash'] = str(hash(query))
super().__init__(
message=message,
code='DATABASE_ERROR',
status_code=500,
details=details,
user_message=user_message or "Datenbankfehler aufgetreten"
)
class ConnectionError(DatabaseException):
def __init__(self, message: str, host: Optional[str] = None):
details = {'host': host} if host else None
super().__init__(
message=message,
details=details,
user_message="Datenbankverbindung fehlgeschlagen"
)
class QueryError(DatabaseException):
def __init__(
self,
message: str,
query: str,
error_code: Optional[str] = None
):
super().__init__(
message=message,
query=query,
details={'error_code': error_code} if error_code else None,
user_message="Datenbankabfrage fehlgeschlagen"
)
class TransactionError(DatabaseException):
def __init__(self, message: str, operation: str):
super().__init__(
message=message,
details={'operation': operation},
user_message="Transaktion fehlgeschlagen"
)
class ExternalServiceException(BaseApplicationException):
def __init__(
self,
service_name: str,
message: str,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
details = details or {}
details['service'] = service_name
super().__init__(
message=message,
code='EXTERNAL_SERVICE_ERROR',
status_code=502,
details=details,
user_message=user_message or f"Fehler beim Zugriff auf {service_name}"
)
class APIError(ExternalServiceException):
def __init__(
self,
service_name: str,
endpoint: str,
status_code: int,
message: str
):
super().__init__(
service_name=service_name,
message=message,
details={
'endpoint': endpoint,
'response_status': status_code
},
user_message=f"API-Fehler bei {service_name}"
)
class TimeoutError(ExternalServiceException):
def __init__(
self,
service_name: str,
timeout_seconds: int,
operation: str
):
super().__init__(
service_name=service_name,
message=f"Timeout after {timeout_seconds}s while {operation}",
details={
'timeout_seconds': timeout_seconds,
'operation': operation
},
user_message=f"Zeitüberschreitung bei {service_name}"
)
class ResourceException(BaseApplicationException):
def __init__(
self,
message: str,
resource_type: str,
resource_id: Any = None,
details: Optional[Dict[str, Any]] = None,
user_message: Optional[str] = None
):
details = details or {}
details.update({
'resource_type': resource_type,
'resource_id': str(resource_id) if resource_id else None
})
super().__init__(
message=message,
code='RESOURCE_ERROR',
status_code=404,
details=details,
user_message=user_message or "Ressourcenfehler"
)
class ResourceNotFoundError(ResourceException):
def __init__(
self,
resource_type: str,
resource_id: Any = None,
search_criteria: Optional[Dict[str, Any]] = None
):
details = {'search_criteria': search_criteria} if search_criteria else None
super().__init__(
message=f"{resource_type} not found",
resource_type=resource_type,
resource_id=resource_id,
details=details,
user_message=f"{resource_type} nicht gefunden"
)
self.status_code = 404
class ResourceConflictError(ResourceException):
def __init__(
self,
resource_type: str,
resource_id: Any,
conflict_reason: str
):
super().__init__(
message=f"Conflict with {resource_type}: {conflict_reason}",
resource_type=resource_type,
resource_id=resource_id,
details={'conflict_reason': conflict_reason},
user_message=f"Konflikt mit {resource_type}"
)
self.status_code = 409
class ResourceLimitExceeded(ResourceException):
def __init__(
self,
resource_type: str,
limit: int,
current: int,
requested: Optional[int] = None
):
details = {
'limit': limit,
'current': current,
'requested': requested
}
super().__init__(
message=f"{resource_type} limit exceeded: {current}/{limit}",
resource_type=resource_type,
details=details,
user_message=f"Limit für {resource_type} überschritten"
)
self.status_code = 429

Datei anzeigen

@@ -0,0 +1,190 @@
import logging
import logging.handlers
import json
import sys
import os
from datetime import datetime
from typing import Dict, Any
from flask import g, request, has_request_context
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
if has_request_context():
log_data['request'] = {
'method': request.method,
'path': request.path,
'remote_addr': request.remote_addr,
'user_agent': request.user_agent.string,
'request_id': getattr(g, 'request_id', 'unknown')
}
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if hasattr(record, 'error_code'):
log_data['error_code'] = record.error_code
if hasattr(record, 'details') and record.details:
log_data['details'] = self._sanitize_details(record.details)
if record.exc_info:
log_data['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': self.formatException(record.exc_info)
}
return json.dumps(log_data, ensure_ascii=False)
def _sanitize_details(self, details: Dict[str, Any]) -> Dict[str, Any]:
sensitive_fields = {
'password', 'secret', 'token', 'api_key', 'authorization',
'credit_card', 'ssn', 'pin'
}
sanitized = {}
for key, value in details.items():
if any(field in key.lower() for field in sensitive_fields):
sanitized[key] = '[REDACTED]'
elif isinstance(value, dict):
sanitized[key] = self._sanitize_details(value)
else:
sanitized[key] = value
return sanitized
class ErrorLevelFilter(logging.Filter):
def __init__(self, min_level=logging.ERROR):
self.min_level = min_level
def filter(self, record):
return record.levelno >= self.min_level
def setup_logging(app):
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
log_dir = os.getenv('LOG_DIR', 'logs')
os.makedirs(log_dir, exist_ok=True)
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level))
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
if app.debug:
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
else:
console_formatter = StructuredFormatter()
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
app_log_handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, 'app.log'),
maxBytes=10 * 1024 * 1024,
backupCount=10
)
app_log_handler.setLevel(logging.DEBUG)
app_log_handler.setFormatter(StructuredFormatter())
root_logger.addHandler(app_log_handler)
error_log_handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, 'errors.log'),
maxBytes=10 * 1024 * 1024,
backupCount=10
)
error_log_handler.setLevel(logging.ERROR)
error_log_handler.setFormatter(StructuredFormatter())
error_log_handler.addFilter(ErrorLevelFilter())
root_logger.addHandler(error_log_handler)
security_logger = logging.getLogger('security')
security_handler = logging.handlers.RotatingFileHandler(
os.path.join(log_dir, 'security.log'),
maxBytes=10 * 1024 * 1024,
backupCount=20
)
security_handler.setFormatter(StructuredFormatter())
security_logger.addHandler(security_handler)
security_logger.setLevel(logging.INFO)
werkzeug_logger = logging.getLogger('werkzeug')
werkzeug_logger.setLevel(logging.WARNING)
@app.before_request
def log_request_info():
logger = logging.getLogger('request')
logger.info(
'Request started',
extra={
'request_id': getattr(g, 'request_id', 'unknown'),
'details': {
'method': request.method,
'path': request.path,
'query_params': dict(request.args),
'content_length': request.content_length
}
}
)
@app.after_request
def log_response_info(response):
logger = logging.getLogger('request')
logger.info(
'Request completed',
extra={
'request_id': getattr(g, 'request_id', 'unknown'),
'details': {
'status_code': response.status_code,
'content_length': response.content_length or 0
}
}
)
return response
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
def log_error(logger: logging.Logger, message: str, error: Exception = None, **kwargs):
extra = kwargs.copy()
if error:
extra['error_type'] = type(error).__name__
extra['error_message'] = str(error)
if hasattr(error, 'code'):
extra['error_code'] = error.code
if hasattr(error, 'details'):
extra['details'] = error.details
logger.error(message, exc_info=error, extra=extra)
def log_security_event(event_type: str, message: str, **details):
logger = logging.getLogger('security')
logger.warning(
f"Security Event: {event_type} - {message}",
extra={
'security_event': event_type,
'details': details,
'request_id': getattr(g, 'request_id', 'unknown') if has_request_context() else None
}
)

Datei anzeigen

@@ -0,0 +1,246 @@
import time
import functools
from typing import Dict, Any, Optional, List
from collections import defaultdict, deque
from datetime import datetime, timedelta
from threading import Lock
import logging
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import g, request, Response
from .exceptions import BaseApplicationException
from .logging_config import log_security_event
logger = logging.getLogger(__name__)
class ErrorMetrics:
def __init__(self):
self.error_counter = Counter(
'app_errors_total',
'Total number of errors',
['error_code', 'status_code', 'endpoint']
)
self.error_rate = Gauge(
'app_error_rate',
'Error rate per minute',
['error_code']
)
self.request_duration = Histogram(
'app_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint', 'status_code']
)
self.validation_errors = Counter(
'app_validation_errors_total',
'Total validation errors',
['field', 'endpoint']
)
self.auth_failures = Counter(
'app_auth_failures_total',
'Total authentication failures',
['reason', 'endpoint']
)
self.db_errors = Counter(
'app_database_errors_total',
'Total database errors',
['error_type', 'operation']
)
self._error_history = defaultdict(lambda: deque(maxlen=60))
self._lock = Lock()
def record_error(self, error: BaseApplicationException, endpoint: str = None):
endpoint = endpoint or request.endpoint or 'unknown'
self.error_counter.labels(
error_code=error.code,
status_code=error.status_code,
endpoint=endpoint
).inc()
with self._lock:
self._error_history[error.code].append(datetime.utcnow())
self._update_error_rates()
if error.code == 'VALIDATION_ERROR' and 'field' in error.details:
self.validation_errors.labels(
field=error.details['field'],
endpoint=endpoint
).inc()
elif error.code == 'AUTHENTICATION_ERROR':
reason = error.__class__.__name__
self.auth_failures.labels(
reason=reason,
endpoint=endpoint
).inc()
elif error.code == 'DATABASE_ERROR':
error_type = error.__class__.__name__
operation = error.details.get('operation', 'unknown')
self.db_errors.labels(
error_type=error_type,
operation=operation
).inc()
def _update_error_rates(self):
now = datetime.utcnow()
one_minute_ago = now - timedelta(minutes=1)
for error_code, timestamps in self._error_history.items():
recent_count = sum(1 for ts in timestamps if ts >= one_minute_ago)
self.error_rate.labels(error_code=error_code).set(recent_count)
class AlertManager:
def __init__(self):
self.alerts = []
self.alert_thresholds = {
'error_rate': 10,
'auth_failure_rate': 5,
'db_error_rate': 3,
'response_time_95th': 2.0
}
self._lock = Lock()
def check_alerts(self, metrics: ErrorMetrics):
new_alerts = []
for error_code, rate in self._get_current_error_rates(metrics).items():
if rate > self.alert_thresholds['error_rate']:
new_alerts.append({
'type': 'high_error_rate',
'severity': 'critical',
'error_code': error_code,
'rate': rate,
'threshold': self.alert_thresholds['error_rate'],
'message': f'High error rate for {error_code}: {rate}/min',
'timestamp': datetime.utcnow()
})
auth_failure_rate = self._get_auth_failure_rate(metrics)
if auth_failure_rate > self.alert_thresholds['auth_failure_rate']:
new_alerts.append({
'type': 'auth_failures',
'severity': 'warning',
'rate': auth_failure_rate,
'threshold': self.alert_thresholds['auth_failure_rate'],
'message': f'High authentication failure rate: {auth_failure_rate}/min',
'timestamp': datetime.utcnow()
})
log_security_event(
'HIGH_AUTH_FAILURE_RATE',
f'Authentication failure rate exceeded threshold',
rate=auth_failure_rate,
threshold=self.alert_thresholds['auth_failure_rate']
)
with self._lock:
self.alerts.extend(new_alerts)
self.alerts = [a for a in self.alerts
if a['timestamp'] > datetime.utcnow() - timedelta(hours=24)]
return new_alerts
def _get_current_error_rates(self, metrics: ErrorMetrics) -> Dict[str, float]:
rates = {}
with metrics._lock:
now = datetime.utcnow()
one_minute_ago = now - timedelta(minutes=1)
for error_code, timestamps in metrics._error_history.items():
rates[error_code] = sum(1 for ts in timestamps if ts >= one_minute_ago)
return rates
def _get_auth_failure_rate(self, metrics: ErrorMetrics) -> float:
return sum(
sample.value
for sample in metrics.auth_failures._child_samples()
) / 60.0
def get_active_alerts(self) -> List[Dict[str, Any]]:
with self._lock:
return list(self.alerts)
error_metrics = ErrorMetrics()
alert_manager = AlertManager()
def init_monitoring(app):
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
if hasattr(g, 'start_time'):
duration = time.time() - g.start_time
error_metrics.request_duration.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status_code=response.status_code
).observe(duration)
return response
@app.route('/metrics')
def metrics():
alert_manager.check_alerts(error_metrics)
return Response(generate_latest(), mimetype='text/plain')
@app.route('/api/alerts')
def get_alerts():
alerts = alert_manager.get_active_alerts()
return {
'alerts': alerts,
'total': len(alerts),
'critical': len([a for a in alerts if a['severity'] == 'critical']),
'warning': len([a for a in alerts if a['severity'] == 'warning'])
}
def monitor_performance(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
if duration > 1.0:
logger.warning(
f"Slow function execution: {func.__name__}",
extra={
'function': func.__name__,
'duration': duration,
'request_id': getattr(g, 'request_id', 'unknown')
}
)
return wrapper
def track_error(error: BaseApplicationException):
error_metrics.record_error(error)
if error.status_code >= 500:
logger.error(
f"Critical error occurred: {error.code}",
extra={
'error_code': error.code,
'message': error.message,
'details': error.details,
'request_id': error.request_id
}
)

Datei anzeigen

@@ -0,0 +1,435 @@
import re
from typing import Any, Optional, List, Dict, Callable, Union
from datetime import datetime, date
from functools import wraps
import ipaddress
from flask import request
from .exceptions import InputValidationError, ValidationException
class ValidationRules:
EMAIL_PATTERN = re.compile(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
PHONE_PATTERN = re.compile(
r'^[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,10}$'
)
LICENSE_KEY_PATTERN = re.compile(
r'^[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
)
SAFE_STRING_PATTERN = re.compile(
r'^[a-zA-Z0-9\s\-\_\.\,\!\?\@\#\$\%\&\*\(\)\[\]\{\}\:\;\'\"\+\=\/\\]+$'
)
USERNAME_PATTERN = re.compile(
r'^[a-zA-Z0-9_\-\.]{3,50}$'
)
PASSWORD_MIN_LENGTH = 8
PASSWORD_REQUIRE_UPPER = True
PASSWORD_REQUIRE_LOWER = True
PASSWORD_REQUIRE_DIGIT = True
PASSWORD_REQUIRE_SPECIAL = True
class Validators:
@staticmethod
def required(value: Any, field_name: str = "field") -> Any:
if value is None or (isinstance(value, str) and not value.strip()):
raise InputValidationError(
field=field_name,
message="This field is required",
value=value
)
return value
@staticmethod
def email(value: str, field_name: str = "email") -> str:
value = Validators.required(value, field_name).strip()
if not ValidationRules.EMAIL_PATTERN.match(value):
raise InputValidationError(
field=field_name,
message="Invalid email format",
value=value,
expected_type="email"
)
return value.lower()
@staticmethod
def phone(value: str, field_name: str = "phone") -> str:
value = Validators.required(value, field_name).strip()
cleaned = re.sub(r'[\s\-\(\)]', '', value)
if not ValidationRules.PHONE_PATTERN.match(value):
raise InputValidationError(
field=field_name,
message="Invalid phone number format",
value=value,
expected_type="phone"
)
return cleaned
@staticmethod
def license_key(value: str, field_name: str = "license_key") -> str:
value = Validators.required(value, field_name).strip().upper()
if not ValidationRules.LICENSE_KEY_PATTERN.match(value):
raise InputValidationError(
field=field_name,
message="Invalid license key format (expected: XXXX-XXXX-XXXX-XXXX)",
value=value,
expected_type="license_key"
)
return value
@staticmethod
def integer(
value: Union[str, int],
field_name: str = "field",
min_value: Optional[int] = None,
max_value: Optional[int] = None
) -> int:
try:
int_value = int(value)
except (ValueError, TypeError):
raise InputValidationError(
field=field_name,
message="Must be a valid integer",
value=value,
expected_type="integer"
)
if min_value is not None and int_value < min_value:
raise InputValidationError(
field=field_name,
message=f"Must be at least {min_value}",
value=int_value
)
if max_value is not None and int_value > max_value:
raise InputValidationError(
field=field_name,
message=f"Must be at most {max_value}",
value=int_value
)
return int_value
@staticmethod
def float_number(
value: Union[str, float],
field_name: str = "field",
min_value: Optional[float] = None,
max_value: Optional[float] = None
) -> float:
try:
float_value = float(value)
except (ValueError, TypeError):
raise InputValidationError(
field=field_name,
message="Must be a valid number",
value=value,
expected_type="float"
)
if min_value is not None and float_value < min_value:
raise InputValidationError(
field=field_name,
message=f"Must be at least {min_value}",
value=float_value
)
if max_value is not None and float_value > max_value:
raise InputValidationError(
field=field_name,
message=f"Must be at most {max_value}",
value=float_value
)
return float_value
@staticmethod
def boolean(value: Union[str, bool], field_name: str = "field") -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
value_lower = value.lower()
if value_lower in ['true', '1', 'yes', 'on']:
return True
elif value_lower in ['false', '0', 'no', 'off']:
return False
raise InputValidationError(
field=field_name,
message="Must be a valid boolean",
value=value,
expected_type="boolean"
)
@staticmethod
def string(
value: str,
field_name: str = "field",
min_length: Optional[int] = None,
max_length: Optional[int] = None,
pattern: Optional[re.Pattern] = None,
safe_only: bool = False
) -> str:
value = Validators.required(value, field_name).strip()
if min_length is not None and len(value) < min_length:
raise InputValidationError(
field=field_name,
message=f"Must be at least {min_length} characters",
value=value
)
if max_length is not None and len(value) > max_length:
raise InputValidationError(
field=field_name,
message=f"Must be at most {max_length} characters",
value=value
)
if safe_only and not ValidationRules.SAFE_STRING_PATTERN.match(value):
raise InputValidationError(
field=field_name,
message="Contains invalid characters",
value=value
)
if pattern and not pattern.match(value):
raise InputValidationError(
field=field_name,
message="Does not match required format",
value=value
)
return value
@staticmethod
def username(value: str, field_name: str = "username") -> str:
value = Validators.required(value, field_name).strip()
if not ValidationRules.USERNAME_PATTERN.match(value):
raise InputValidationError(
field=field_name,
message="Username must be 3-50 characters and contain only letters, numbers, _, -, or .",
value=value,
expected_type="username"
)
return value
@staticmethod
def password(value: str, field_name: str = "password") -> str:
value = Validators.required(value, field_name)
errors = []
if len(value) < ValidationRules.PASSWORD_MIN_LENGTH:
errors.append(f"at least {ValidationRules.PASSWORD_MIN_LENGTH} characters")
if ValidationRules.PASSWORD_REQUIRE_UPPER and not re.search(r'[A-Z]', value):
errors.append("at least one uppercase letter")
if ValidationRules.PASSWORD_REQUIRE_LOWER and not re.search(r'[a-z]', value):
errors.append("at least one lowercase letter")
if ValidationRules.PASSWORD_REQUIRE_DIGIT and not re.search(r'\d', value):
errors.append("at least one digit")
if ValidationRules.PASSWORD_REQUIRE_SPECIAL and not re.search(r'[!@#$%^&*(),.?":{}|<>]', value):
errors.append("at least one special character")
if errors:
raise InputValidationError(
field=field_name,
message=f"Password must contain {', '.join(errors)}",
value="[hidden]"
)
return value
@staticmethod
def date_string(
value: str,
field_name: str = "date",
format: str = "%Y-%m-%d",
min_date: Optional[date] = None,
max_date: Optional[date] = None
) -> date:
value = Validators.required(value, field_name).strip()
try:
date_value = datetime.strptime(value, format).date()
except ValueError:
raise InputValidationError(
field=field_name,
message=f"Invalid date format (expected: {format})",
value=value,
expected_type="date"
)
if min_date and date_value < min_date:
raise InputValidationError(
field=field_name,
message=f"Date must be after {min_date}",
value=value
)
if max_date and date_value > max_date:
raise InputValidationError(
field=field_name,
message=f"Date must be before {max_date}",
value=value
)
return date_value
@staticmethod
def ip_address(
value: str,
field_name: str = "ip_address",
version: Optional[int] = None
) -> str:
value = Validators.required(value, field_name).strip()
try:
ip = ipaddress.ip_address(value)
if version and ip.version != version:
raise ValueError
except ValueError:
version_str = f"IPv{version}" if version else "IP"
raise InputValidationError(
field=field_name,
message=f"Invalid {version_str} address",
value=value,
expected_type="ip_address"
)
return str(ip)
@staticmethod
def url(
value: str,
field_name: str = "url",
require_https: bool = False
) -> str:
value = Validators.required(value, field_name).strip()
url_pattern = re.compile(
r'^https?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE
)
if not url_pattern.match(value):
raise InputValidationError(
field=field_name,
message="Invalid URL format",
value=value,
expected_type="url"
)
if require_https and not value.startswith('https://'):
raise InputValidationError(
field=field_name,
message="URL must use HTTPS",
value=value
)
return value
@staticmethod
def enum(
value: Any,
field_name: str,
allowed_values: List[Any]
) -> Any:
if value not in allowed_values:
raise InputValidationError(
field=field_name,
message=f"Must be one of: {', '.join(map(str, allowed_values))}",
value=value
)
return value
def validate(rules: Dict[str, Dict[str, Any]]) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
data = request.get_json() if request.is_json else request.form
validated_data = {}
for field_name, field_rules in rules.items():
value = data.get(field_name)
if 'required' in field_rules and field_rules['required']:
value = Validators.required(value, field_name)
elif value is None or value == '':
if 'default' in field_rules:
validated_data[field_name] = field_rules['default']
continue
validator_name = field_rules.get('type', 'string')
validator_func = getattr(Validators, validator_name, None)
if not validator_func:
raise ValueError(f"Unknown validator type: {validator_name}")
validator_params = {
k: v for k, v in field_rules.items()
if k not in ['type', 'required', 'default']
}
validator_params['field_name'] = field_name
validated_data[field_name] = validator_func(value, **validator_params)
request.validated_data = validated_data
return func(*args, **kwargs)
return wrapper
return decorator
def sanitize_html(value: str) -> str:
dangerous_tags = re.compile(
r'<(script|iframe|object|embed|form|input|button|textarea|select|link|meta|style).*?>.*?</\1>',
re.IGNORECASE | re.DOTALL
)
dangerous_attrs = re.compile(
r'\s*(on\w+|style|javascript:)[\s]*=[\s]*["\']?[^"\'>\s]+',
re.IGNORECASE
)
value = dangerous_tags.sub('', value)
value = dangerous_attrs.sub('', value)
return value
def sanitize_sql_identifier(value: str) -> str:
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', value):
raise ValidationException(
message="Invalid SQL identifier",
details={'value': value},
user_message="Ungültiger Bezeichner"
)
return value

Datei anzeigen

@@ -0,0 +1 @@
from .error_middleware import ErrorHandlingMiddleware

Datei anzeigen

@@ -0,0 +1,54 @@
import time
import uuid
from typing import Optional
from flask import request, g
from werkzeug.exceptions import HTTPException
from core.exceptions import BaseApplicationException
from core.monitoring import track_error
from core.logging_config import get_logger
logger = get_logger(__name__)
class ErrorHandlingMiddleware:
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
app.before_request(self._before_request)
app.teardown_appcontext(self._teardown_request)
def _before_request(self):
g.request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
g.start_time = time.time()
g.errors = []
def _teardown_request(self, exception=None):
if exception:
self._handle_exception(exception)
if hasattr(g, 'errors') and g.errors:
for error in g.errors:
if isinstance(error, BaseApplicationException):
track_error(error)
def _handle_exception(self, exception):
if isinstance(exception, BaseApplicationException):
track_error(exception)
elif isinstance(exception, HTTPException):
pass
else:
logger.error(
f"Unhandled exception: {type(exception).__name__}",
exc_info=True,
extra={
'request_id': getattr(g, 'request_id', 'unknown'),
'endpoint': request.endpoint,
'method': request.method,
'path': request.path
}
)

Datei anzeigen

@@ -14,3 +14,4 @@ pyotp
qrcode[pil]
PyJWT
prometheus-flask-exporter
prometheus-client

Datei anzeigen

@@ -5,13 +5,40 @@
{% block content %}
<div class="container mt-5">
<div class="row justify-content-center">
<div class="col-md-6">
<div class="card">
<div class="card-body text-center">
<h1 class="display-1">500</h1>
<h2>Interner Serverfehler</h2>
<p>Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.</p>
<a href="{{ url_for('admin.dashboard') }}" class="btn btn-primary">Zur Startseite</a>
<div class="col-md-8">
<div class="card shadow-lg">
<div class="card-body text-center py-5">
<div class="mb-4">
<i class="bi bi-exclamation-octagon text-danger" style="font-size: 4rem;"></i>
</div>
<h1 class="display-1 text-danger">500</h1>
<h2 class="mb-4">Interner Serverfehler</h2>
<p class="lead mb-4">
Es tut uns leid, aber es ist ein unerwarteter Fehler aufgetreten.
Unser Team wurde benachrichtigt und arbeitet an einer Lösung.
</p>
{% if error %}
<div class="alert alert-warning text-start mx-auto" style="max-width: 500px;">
<p class="mb-0"><strong>Fehlermeldung:</strong> {{ error }}</p>
</div>
{% endif %}
<div class="mt-4">
<a href="{{ url_for('admin.dashboard') }}" class="btn btn-primary btn-lg">
<i class="bi bi-house"></i> Zum Dashboard
</a>
<button onclick="location.reload();" class="btn btn-outline-secondary btn-lg ms-2">
<i class="bi bi-arrow-clockwise"></i> Seite neu laden
</button>
</div>
<div class="mt-5 text-muted">
<small>
Fehler-ID: <code>{{ request_id or 'Nicht verfügbar' }}</code><br>
Zeitstempel: {{ timestamp or 'Nicht verfügbar' }}
</small>
</div>
</div>
</div>
</div>

Datei anzeigen

@@ -6,28 +6,46 @@
<div class="container">
<div class="row justify-content-center mt-5">
<div class="col-md-8">
<div class="card">
<div class="card shadow">
<div class="card-header bg-danger text-white">
<h4 class="mb-0">
<i class="bi bi-exclamation-triangle-fill"></i>
{{ error_message|default('Ein Fehler ist aufgetreten') }}
Fehler
</h4>
</div>
<div class="card-body">
{% if details %}
<div class="alert alert-danger">
<h6>Details:</h6>
<pre class="mb-0">{{ details }}</pre>
</div>
<div class="alert alert-danger mb-4" role="alert">
<h5 class="alert-heading">{{ error|default('Ein Fehler ist aufgetreten') }}</h5>
{% if error_code %}
<hr>
<p class="mb-0">
<strong>Fehlercode:</strong> {{ error_code }}<br>
{% if request_id %}
<strong>Anfrage-ID:</strong> <code>{{ request_id }}</code>
{% endif %}
</p>
{% endif %}
</div>
<div class="mt-3">
<div class="bg-light p-3 rounded mb-4">
<h6 class="text-muted">Was können Sie tun?</h6>
<ul class="mb-0">
<li>Versuchen Sie die Aktion erneut</li>
<li>Überprüfen Sie Ihre Eingaben</li>
<li>Kontaktieren Sie den Support mit der Anfrage-ID</li>
</ul>
</div>
<div class="d-flex gap-2">
<a href="{{ url_for('admin.dashboard') }}" class="btn btn-primary">
<i class="bi bi-house"></i> Zurück zum Dashboard
<i class="bi bi-house"></i> Zum Dashboard
</a>
<button onclick="window.history.back();" class="btn btn-secondary">
<i class="bi bi-arrow-left"></i> Zurück
</button>
<button onclick="location.reload();" class="btn btn-outline-secondary">
<i class="bi bi-arrow-clockwise"></i> Erneut versuchen
</button>
</div>
</div>
</div>

Datei anzeigen

Datei anzeigen

@@ -0,0 +1,350 @@
import pytest
import json
from datetime import datetime
from flask import Flask, request
from werkzeug.exceptions import NotFound
from ..core.exceptions import (
ValidationException, InputValidationError, AuthenticationException,
InvalidCredentialsError, DatabaseException, QueryError,
ResourceNotFoundError, BusinessRuleViolation
)
from ..core.error_handlers import init_error_handlers, handle_errors, validate_request
from ..core.validators import Validators, validate
from ..core.monitoring import error_metrics, alert_manager
from ..middleware.error_middleware import ErrorHandlingMiddleware
@pytest.fixture
def app():
app = Flask(__name__)
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'test-secret-key'
init_error_handlers(app)
ErrorHandlingMiddleware(app)
@app.route('/test-validation-error')
def test_validation_error():
raise InputValidationError(
field='email',
message='Invalid email format',
value='not-an-email'
)
@app.route('/test-auth-error')
def test_auth_error():
raise InvalidCredentialsError('testuser')
@app.route('/test-db-error')
def test_db_error():
raise QueryError(
message='Column not found',
query='SELECT * FROM users',
error_code='42703'
)
@app.route('/test-not-found')
def test_not_found():
raise ResourceNotFoundError('User', 123)
@app.route('/test-generic-error')
def test_generic_error():
raise Exception('Something went wrong')
@app.route('/test-validate', methods=['POST'])
@validate({
'email': {'type': 'email', 'required': True},
'age': {'type': 'integer', 'required': True, 'min_value': 18},
'username': {'type': 'username', 'required': False}
})
def test_validate():
return {'success': True, 'data': request.validated_data}
@app.route('/test-handle-errors')
@handle_errors(
catch=(ValueError,),
message='Value error occurred',
user_message='Ungültiger Wert'
)
def test_handle_errors():
raise ValueError('Invalid value')
return app
@pytest.fixture
def client(app):
return app.test_client()
class TestExceptions:
def test_validation_exception(self):
exc = ValidationException(
message='Test validation error',
field='test_field',
value='test_value',
user_message='Test user message'
)
assert exc.code == 'VALIDATION_ERROR'
assert exc.status_code == 400
assert exc.details['field'] == 'test_field'
assert exc.details['value'] == 'test_value'
assert exc.user_message == 'Test user message'
def test_input_validation_error(self):
exc = InputValidationError(
field='email',
message='Invalid format',
value='not-email',
expected_type='email'
)
assert exc.code == 'VALIDATION_ERROR'
assert exc.status_code == 400
assert exc.details['field'] == 'email'
assert exc.details['expected_type'] == 'email'
def test_business_rule_violation(self):
exc = BusinessRuleViolation(
rule='max_licenses',
message='License limit exceeded',
context={'current': 10, 'max': 5}
)
assert exc.details['rule'] == 'max_licenses'
assert exc.details['context']['current'] == 10
def test_authentication_exceptions(self):
exc = InvalidCredentialsError('testuser')
assert exc.status_code == 401
assert exc.details['username'] == 'testuser'
def test_database_exceptions(self):
exc = QueryError(
message='Syntax error',
query='SELECT * FORM users',
error_code='42601'
)
assert exc.code == 'DATABASE_ERROR'
assert exc.status_code == 500
assert 'query_hash' in exc.details
assert exc.details['error_code'] == '42601'
def test_resource_not_found(self):
exc = ResourceNotFoundError(
resource_type='License',
resource_id='ABC123',
search_criteria={'customer_id': 1}
)
assert exc.status_code == 404
assert exc.details['resource_type'] == 'License'
assert exc.details['resource_id'] == 'ABC123'
assert exc.details['search_criteria']['customer_id'] == 1
class TestErrorHandlers:
def test_validation_error_json_response(self, client):
response = client.get(
'/test-validation-error',
headers={'Accept': 'application/json'}
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert data['error']['code'] == 'VALIDATION_ERROR'
assert 'request_id' in data['error']
def test_validation_error_html_response(self, client):
response = client.get('/test-validation-error')
assert response.status_code == 400
assert b'Ungültiger Wert für Feld' in response.data
def test_auth_error_response(self, client):
response = client.get(
'/test-auth-error',
headers={'Accept': 'application/json'}
)
assert response.status_code == 401
data = json.loads(response.data)
assert data['error']['code'] == 'AUTHENTICATION_ERROR'
def test_db_error_response(self, client):
response = client.get(
'/test-db-error',
headers={'Accept': 'application/json'}
)
assert response.status_code == 500
data = json.loads(response.data)
assert data['error']['code'] == 'DATABASE_ERROR'
def test_not_found_response(self, client):
response = client.get('/test-not-found')
assert response.status_code == 404
assert b'User nicht gefunden' in response.data
def test_generic_error_response(self, client):
response = client.get(
'/test-generic-error',
headers={'Accept': 'application/json'}
)
assert response.status_code == 500
data = json.loads(response.data)
assert data['error']['code'] == 'INTERNAL_ERROR'
def test_validate_decorator_success(self, client):
response = client.post(
'/test-validate',
json={'email': 'test@example.com', 'age': 25, 'username': 'testuser'}
)
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
assert data['data']['email'] == 'test@example.com'
assert data['data']['age'] == 25
def test_validate_decorator_failure(self, client):
response = client.post(
'/test-validate',
json={'email': 'invalid-email', 'age': 15}
)
assert response.status_code == 400
data = json.loads(response.data)
assert data['error']['code'] == 'VALIDATION_ERROR'
def test_handle_errors_decorator(self, client):
response = client.get('/test-handle-errors')
assert response.status_code == 500
assert b'Ungültiger Wert' in response.data
class TestValidators:
def test_email_validator(self):
assert Validators.email('test@example.com') == 'test@example.com'
assert Validators.email('TEST@EXAMPLE.COM') == 'test@example.com'
with pytest.raises(InputValidationError):
Validators.email('invalid-email')
with pytest.raises(InputValidationError):
Validators.email('')
def test_phone_validator(self):
assert Validators.phone('+1-234-567-8900') == '+12345678900'
assert Validators.phone('(123) 456-7890') == '1234567890'
with pytest.raises(InputValidationError):
Validators.phone('123')
def test_license_key_validator(self):
assert Validators.license_key('abcd-1234-efgh-5678') == 'ABCD-1234-EFGH-5678'
with pytest.raises(InputValidationError):
Validators.license_key('invalid-key')
def test_integer_validator(self):
assert Validators.integer('123') == 123
assert Validators.integer(456) == 456
assert Validators.integer('10', min_value=5, max_value=15) == 10
with pytest.raises(InputValidationError):
Validators.integer('abc')
with pytest.raises(InputValidationError):
Validators.integer('3', min_value=5)
with pytest.raises(InputValidationError):
Validators.integer('20', max_value=15)
def test_string_validator(self):
assert Validators.string('test', min_length=2, max_length=10) == 'test'
with pytest.raises(InputValidationError):
Validators.string('a', min_length=2)
with pytest.raises(InputValidationError):
Validators.string('a' * 20, max_length=10)
with pytest.raises(InputValidationError):
Validators.string('test<script>', safe_only=True)
def test_password_validator(self):
assert Validators.password('Test123!@#') == 'Test123!@#'
with pytest.raises(InputValidationError) as exc:
Validators.password('weak')
assert 'at least 8 characters' in str(exc.value)
with pytest.raises(InputValidationError) as exc:
Validators.password('alllowercase123!')
assert 'uppercase letter' in str(exc.value)
def test_ip_address_validator(self):
assert Validators.ip_address('192.168.1.1') == '192.168.1.1'
assert Validators.ip_address('::1') == '::1'
with pytest.raises(InputValidationError):
Validators.ip_address('999.999.999.999')
with pytest.raises(InputValidationError):
Validators.ip_address('::1', version=4)
def test_url_validator(self):
assert Validators.url('http://example.com') == 'http://example.com'
assert Validators.url('https://example.com/path') == 'https://example.com/path'
with pytest.raises(InputValidationError):
Validators.url('not-a-url')
with pytest.raises(InputValidationError):
Validators.url('http://example.com', require_https=True)
def test_enum_validator(self):
assert Validators.enum('active', 'status', ['active', 'inactive']) == 'active'
with pytest.raises(InputValidationError):
Validators.enum('pending', 'status', ['active', 'inactive'])
class TestMonitoring:
def test_error_metrics_recording(self, client):
initial_count = error_metrics.error_counter._value._value
client.get('/test-validation-error')
new_count = error_metrics.error_counter._value._value
assert new_count > initial_count
def test_alert_generation(self):
for i in range(15):
error = ValidationException(
message=f'Test error {i}',
code='TEST_ERROR'
)
error_metrics.record_error(error, 'test_endpoint')
alerts = alert_manager.check_alerts(error_metrics)
assert any(a['type'] == 'high_error_rate' for a in alerts)
def test_metrics_endpoint(self, client):
response = client.get('/metrics')
assert response.status_code == 200
assert b'app_errors_total' in response.data
assert b'app_request_duration_seconds' in response.data
if __name__ == '__main__':
pytest.main([__file__, '-v'])