435 Zeilen
14 KiB
Python
435 Zeilen
14 KiB
Python
import re
|
|
from typing import Any, Optional, List, Dict, Callable, Union
|
|
from datetime import datetime, date
|
|
from functools import wraps
|
|
import ipaddress
|
|
from flask import request
|
|
|
|
from .exceptions import InputValidationError, ValidationException
|
|
|
|
|
|
class ValidationRules:
|
|
EMAIL_PATTERN = re.compile(
|
|
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
)
|
|
|
|
PHONE_PATTERN = re.compile(
|
|
r'^[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,10}$'
|
|
)
|
|
|
|
LICENSE_KEY_PATTERN = re.compile(
|
|
r'^[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
|
|
)
|
|
|
|
SAFE_STRING_PATTERN = re.compile(
|
|
r'^[a-zA-Z0-9\s\-\_\.\,\!\?\@\#\$\%\&\*\(\)\[\]\{\}\:\;\'\"\+\=\/\\]+$'
|
|
)
|
|
|
|
USERNAME_PATTERN = re.compile(
|
|
r'^[a-zA-Z0-9_\-\.]{3,50}$'
|
|
)
|
|
|
|
PASSWORD_MIN_LENGTH = 8
|
|
PASSWORD_REQUIRE_UPPER = True
|
|
PASSWORD_REQUIRE_LOWER = True
|
|
PASSWORD_REQUIRE_DIGIT = True
|
|
PASSWORD_REQUIRE_SPECIAL = True
|
|
|
|
|
|
class Validators:
|
|
@staticmethod
|
|
def required(value: Any, field_name: str = "field") -> Any:
|
|
if value is None or (isinstance(value, str) and not value.strip()):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="This field is required",
|
|
value=value
|
|
)
|
|
return value
|
|
|
|
@staticmethod
|
|
def email(value: str, field_name: str = "email") -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
if not ValidationRules.EMAIL_PATTERN.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Invalid email format",
|
|
value=value,
|
|
expected_type="email"
|
|
)
|
|
|
|
return value.lower()
|
|
|
|
@staticmethod
|
|
def phone(value: str, field_name: str = "phone") -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
cleaned = re.sub(r'[\s\-\(\)]', '', value)
|
|
|
|
if not ValidationRules.PHONE_PATTERN.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Invalid phone number format",
|
|
value=value,
|
|
expected_type="phone"
|
|
)
|
|
|
|
return cleaned
|
|
|
|
@staticmethod
|
|
def license_key(value: str, field_name: str = "license_key") -> str:
|
|
value = Validators.required(value, field_name).strip().upper()
|
|
|
|
if not ValidationRules.LICENSE_KEY_PATTERN.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Invalid license key format (expected: XXXX-XXXX-XXXX-XXXX)",
|
|
value=value,
|
|
expected_type="license_key"
|
|
)
|
|
|
|
return value
|
|
|
|
@staticmethod
|
|
def integer(
|
|
value: Union[str, int],
|
|
field_name: str = "field",
|
|
min_value: Optional[int] = None,
|
|
max_value: Optional[int] = None
|
|
) -> int:
|
|
try:
|
|
int_value = int(value)
|
|
except (ValueError, TypeError):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Must be a valid integer",
|
|
value=value,
|
|
expected_type="integer"
|
|
)
|
|
|
|
if min_value is not None and int_value < min_value:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at least {min_value}",
|
|
value=int_value
|
|
)
|
|
|
|
if max_value is not None and int_value > max_value:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at most {max_value}",
|
|
value=int_value
|
|
)
|
|
|
|
return int_value
|
|
|
|
@staticmethod
|
|
def float_number(
|
|
value: Union[str, float],
|
|
field_name: str = "field",
|
|
min_value: Optional[float] = None,
|
|
max_value: Optional[float] = None
|
|
) -> float:
|
|
try:
|
|
float_value = float(value)
|
|
except (ValueError, TypeError):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Must be a valid number",
|
|
value=value,
|
|
expected_type="float"
|
|
)
|
|
|
|
if min_value is not None and float_value < min_value:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at least {min_value}",
|
|
value=float_value
|
|
)
|
|
|
|
if max_value is not None and float_value > max_value:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at most {max_value}",
|
|
value=float_value
|
|
)
|
|
|
|
return float_value
|
|
|
|
@staticmethod
|
|
def boolean(value: Union[str, bool], field_name: str = "field") -> bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
|
|
if isinstance(value, str):
|
|
value_lower = value.lower()
|
|
if value_lower in ['true', '1', 'yes', 'on']:
|
|
return True
|
|
elif value_lower in ['false', '0', 'no', 'off']:
|
|
return False
|
|
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Must be a valid boolean",
|
|
value=value,
|
|
expected_type="boolean"
|
|
)
|
|
|
|
@staticmethod
|
|
def string(
|
|
value: str,
|
|
field_name: str = "field",
|
|
min_length: Optional[int] = None,
|
|
max_length: Optional[int] = None,
|
|
pattern: Optional[re.Pattern] = None,
|
|
safe_only: bool = False
|
|
) -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
if min_length is not None and len(value) < min_length:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at least {min_length} characters",
|
|
value=value
|
|
)
|
|
|
|
if max_length is not None and len(value) > max_length:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be at most {max_length} characters",
|
|
value=value
|
|
)
|
|
|
|
if safe_only and not ValidationRules.SAFE_STRING_PATTERN.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Contains invalid characters",
|
|
value=value
|
|
)
|
|
|
|
if pattern and not pattern.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Does not match required format",
|
|
value=value
|
|
)
|
|
|
|
return value
|
|
|
|
@staticmethod
|
|
def username(value: str, field_name: str = "username") -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
if not ValidationRules.USERNAME_PATTERN.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Username must be 3-50 characters and contain only letters, numbers, _, -, or .",
|
|
value=value,
|
|
expected_type="username"
|
|
)
|
|
|
|
return value
|
|
|
|
@staticmethod
|
|
def password(value: str, field_name: str = "password") -> str:
|
|
value = Validators.required(value, field_name)
|
|
|
|
errors = []
|
|
|
|
if len(value) < ValidationRules.PASSWORD_MIN_LENGTH:
|
|
errors.append(f"at least {ValidationRules.PASSWORD_MIN_LENGTH} characters")
|
|
|
|
if ValidationRules.PASSWORD_REQUIRE_UPPER and not re.search(r'[A-Z]', value):
|
|
errors.append("at least one uppercase letter")
|
|
|
|
if ValidationRules.PASSWORD_REQUIRE_LOWER and not re.search(r'[a-z]', value):
|
|
errors.append("at least one lowercase letter")
|
|
|
|
if ValidationRules.PASSWORD_REQUIRE_DIGIT and not re.search(r'\d', value):
|
|
errors.append("at least one digit")
|
|
|
|
if ValidationRules.PASSWORD_REQUIRE_SPECIAL and not re.search(r'[!@#$%^&*(),.?":{}|<>]', value):
|
|
errors.append("at least one special character")
|
|
|
|
if errors:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Password must contain {', '.join(errors)}",
|
|
value="[hidden]"
|
|
)
|
|
|
|
return value
|
|
|
|
@staticmethod
|
|
def date_string(
|
|
value: str,
|
|
field_name: str = "date",
|
|
format: str = "%Y-%m-%d",
|
|
min_date: Optional[date] = None,
|
|
max_date: Optional[date] = None
|
|
) -> date:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
try:
|
|
date_value = datetime.strptime(value, format).date()
|
|
except ValueError:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Invalid date format (expected: {format})",
|
|
value=value,
|
|
expected_type="date"
|
|
)
|
|
|
|
if min_date and date_value < min_date:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Date must be after {min_date}",
|
|
value=value
|
|
)
|
|
|
|
if max_date and date_value > max_date:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Date must be before {max_date}",
|
|
value=value
|
|
)
|
|
|
|
return date_value
|
|
|
|
@staticmethod
|
|
def ip_address(
|
|
value: str,
|
|
field_name: str = "ip_address",
|
|
version: Optional[int] = None
|
|
) -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
try:
|
|
ip = ipaddress.ip_address(value)
|
|
if version and ip.version != version:
|
|
raise ValueError
|
|
except ValueError:
|
|
version_str = f"IPv{version}" if version else "IP"
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Invalid {version_str} address",
|
|
value=value,
|
|
expected_type="ip_address"
|
|
)
|
|
|
|
return str(ip)
|
|
|
|
@staticmethod
|
|
def url(
|
|
value: str,
|
|
field_name: str = "url",
|
|
require_https: bool = False
|
|
) -> str:
|
|
value = Validators.required(value, field_name).strip()
|
|
|
|
url_pattern = re.compile(
|
|
r'^https?://'
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
|
|
r'localhost|'
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
|
|
r'(?::\d+)?'
|
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE
|
|
)
|
|
|
|
if not url_pattern.match(value):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="Invalid URL format",
|
|
value=value,
|
|
expected_type="url"
|
|
)
|
|
|
|
if require_https and not value.startswith('https://'):
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message="URL must use HTTPS",
|
|
value=value
|
|
)
|
|
|
|
return value
|
|
|
|
@staticmethod
|
|
def enum(
|
|
value: Any,
|
|
field_name: str,
|
|
allowed_values: List[Any]
|
|
) -> Any:
|
|
if value not in allowed_values:
|
|
raise InputValidationError(
|
|
field=field_name,
|
|
message=f"Must be one of: {', '.join(map(str, allowed_values))}",
|
|
value=value
|
|
)
|
|
|
|
return value
|
|
|
|
|
|
def validate(rules: Dict[str, Dict[str, Any]]) -> Callable:
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
data = request.get_json() if request.is_json else request.form
|
|
validated_data = {}
|
|
|
|
for field_name, field_rules in rules.items():
|
|
value = data.get(field_name)
|
|
|
|
if 'required' in field_rules and field_rules['required']:
|
|
value = Validators.required(value, field_name)
|
|
elif value is None or value == '':
|
|
if 'default' in field_rules:
|
|
validated_data[field_name] = field_rules['default']
|
|
continue
|
|
|
|
validator_name = field_rules.get('type', 'string')
|
|
validator_func = getattr(Validators, validator_name, None)
|
|
|
|
if not validator_func:
|
|
raise ValueError(f"Unknown validator type: {validator_name}")
|
|
|
|
validator_params = {
|
|
k: v for k, v in field_rules.items()
|
|
if k not in ['type', 'required', 'default']
|
|
}
|
|
validator_params['field_name'] = field_name
|
|
|
|
validated_data[field_name] = validator_func(value, **validator_params)
|
|
|
|
request.validated_data = validated_data
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def sanitize_html(value: str) -> str:
|
|
dangerous_tags = re.compile(
|
|
r'<(script|iframe|object|embed|form|input|button|textarea|select|link|meta|style).*?>.*?</\1>',
|
|
re.IGNORECASE | re.DOTALL
|
|
)
|
|
dangerous_attrs = re.compile(
|
|
r'\s*(on\w+|style|javascript:)[\s]*=[\s]*["\']?[^"\'>\s]+',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
value = dangerous_tags.sub('', value)
|
|
value = dangerous_attrs.sub('', value)
|
|
|
|
return value
|
|
|
|
|
|
def sanitize_sql_identifier(value: str) -> str:
|
|
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', value):
|
|
raise ValidationException(
|
|
message="Invalid SQL identifier",
|
|
details={'value': value},
|
|
user_message="Ungültiger Bezeichner"
|
|
)
|
|
|
|
return value |