import re from typing import Any, Optional, List, Dict, Callable, Union from datetime import datetime, date from functools import wraps import ipaddress from flask import request from .exceptions import InputValidationError, ValidationException class ValidationRules: EMAIL_PATTERN = re.compile( r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' ) PHONE_PATTERN = re.compile( r'^[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,10}$' ) LICENSE_KEY_PATTERN = re.compile( r'^[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$' ) SAFE_STRING_PATTERN = re.compile( r'^[a-zA-Z0-9\s\-\_\.\,\!\?\@\#\$\%\&\*\(\)\[\]\{\}\:\;\'\"\+\=\/\\]+$' ) USERNAME_PATTERN = re.compile( r'^[a-zA-Z0-9_\-\.]{3,50}$' ) PASSWORD_MIN_LENGTH = 8 PASSWORD_REQUIRE_UPPER = True PASSWORD_REQUIRE_LOWER = True PASSWORD_REQUIRE_DIGIT = True PASSWORD_REQUIRE_SPECIAL = True class Validators: @staticmethod def required(value: Any, field_name: str = "field") -> Any: if value is None or (isinstance(value, str) and not value.strip()): raise InputValidationError( field=field_name, message="This field is required", value=value ) return value @staticmethod def email(value: str, field_name: str = "email") -> str: value = Validators.required(value, field_name).strip() if not ValidationRules.EMAIL_PATTERN.match(value): raise InputValidationError( field=field_name, message="Invalid email format", value=value, expected_type="email" ) return value.lower() @staticmethod def phone(value: str, field_name: str = "phone") -> str: value = Validators.required(value, field_name).strip() cleaned = re.sub(r'[\s\-\(\)]', '', value) if not ValidationRules.PHONE_PATTERN.match(value): raise InputValidationError( field=field_name, message="Invalid phone number format", value=value, expected_type="phone" ) return cleaned @staticmethod def license_key(value: str, field_name: str = "license_key") -> str: value = Validators.required(value, field_name).strip().upper() if not ValidationRules.LICENSE_KEY_PATTERN.match(value): raise InputValidationError( field=field_name, message="Invalid license key format (expected: XXXX-XXXX-XXXX-XXXX)", value=value, expected_type="license_key" ) return value @staticmethod def integer( value: Union[str, int], field_name: str = "field", min_value: Optional[int] = None, max_value: Optional[int] = None ) -> int: try: int_value = int(value) except (ValueError, TypeError): raise InputValidationError( field=field_name, message="Must be a valid integer", value=value, expected_type="integer" ) if min_value is not None and int_value < min_value: raise InputValidationError( field=field_name, message=f"Must be at least {min_value}", value=int_value ) if max_value is not None and int_value > max_value: raise InputValidationError( field=field_name, message=f"Must be at most {max_value}", value=int_value ) return int_value @staticmethod def float_number( value: Union[str, float], field_name: str = "field", min_value: Optional[float] = None, max_value: Optional[float] = None ) -> float: try: float_value = float(value) except (ValueError, TypeError): raise InputValidationError( field=field_name, message="Must be a valid number", value=value, expected_type="float" ) if min_value is not None and float_value < min_value: raise InputValidationError( field=field_name, message=f"Must be at least {min_value}", value=float_value ) if max_value is not None and float_value > max_value: raise InputValidationError( field=field_name, message=f"Must be at most {max_value}", value=float_value ) return float_value @staticmethod def boolean(value: Union[str, bool], field_name: str = "field") -> bool: if isinstance(value, bool): return value if isinstance(value, str): value_lower = value.lower() if value_lower in ['true', '1', 'yes', 'on']: return True elif value_lower in ['false', '0', 'no', 'off']: return False raise InputValidationError( field=field_name, message="Must be a valid boolean", value=value, expected_type="boolean" ) @staticmethod def string( value: str, field_name: str = "field", min_length: Optional[int] = None, max_length: Optional[int] = None, pattern: Optional[re.Pattern] = None, safe_only: bool = False ) -> str: value = Validators.required(value, field_name).strip() if min_length is not None and len(value) < min_length: raise InputValidationError( field=field_name, message=f"Must be at least {min_length} characters", value=value ) if max_length is not None and len(value) > max_length: raise InputValidationError( field=field_name, message=f"Must be at most {max_length} characters", value=value ) if safe_only and not ValidationRules.SAFE_STRING_PATTERN.match(value): raise InputValidationError( field=field_name, message="Contains invalid characters", value=value ) if pattern and not pattern.match(value): raise InputValidationError( field=field_name, message="Does not match required format", value=value ) return value @staticmethod def username(value: str, field_name: str = "username") -> str: value = Validators.required(value, field_name).strip() if not ValidationRules.USERNAME_PATTERN.match(value): raise InputValidationError( field=field_name, message="Username must be 3-50 characters and contain only letters, numbers, _, -, or .", value=value, expected_type="username" ) return value @staticmethod def password(value: str, field_name: str = "password") -> str: value = Validators.required(value, field_name) errors = [] if len(value) < ValidationRules.PASSWORD_MIN_LENGTH: errors.append(f"at least {ValidationRules.PASSWORD_MIN_LENGTH} characters") if ValidationRules.PASSWORD_REQUIRE_UPPER and not re.search(r'[A-Z]', value): errors.append("at least one uppercase letter") if ValidationRules.PASSWORD_REQUIRE_LOWER and not re.search(r'[a-z]', value): errors.append("at least one lowercase letter") if ValidationRules.PASSWORD_REQUIRE_DIGIT and not re.search(r'\d', value): errors.append("at least one digit") if ValidationRules.PASSWORD_REQUIRE_SPECIAL and not re.search(r'[!@#$%^&*(),.?":{}|<>]', value): errors.append("at least one special character") if errors: raise InputValidationError( field=field_name, message=f"Password must contain {', '.join(errors)}", value="[hidden]" ) return value @staticmethod def date_string( value: str, field_name: str = "date", format: str = "%Y-%m-%d", min_date: Optional[date] = None, max_date: Optional[date] = None ) -> date: value = Validators.required(value, field_name).strip() try: date_value = datetime.strptime(value, format).date() except ValueError: raise InputValidationError( field=field_name, message=f"Invalid date format (expected: {format})", value=value, expected_type="date" ) if min_date and date_value < min_date: raise InputValidationError( field=field_name, message=f"Date must be after {min_date}", value=value ) if max_date and date_value > max_date: raise InputValidationError( field=field_name, message=f"Date must be before {max_date}", value=value ) return date_value @staticmethod def ip_address( value: str, field_name: str = "ip_address", version: Optional[int] = None ) -> str: value = Validators.required(value, field_name).strip() try: ip = ipaddress.ip_address(value) if version and ip.version != version: raise ValueError except ValueError: version_str = f"IPv{version}" if version else "IP" raise InputValidationError( field=field_name, message=f"Invalid {version_str} address", value=value, expected_type="ip_address" ) return str(ip) @staticmethod def url( value: str, field_name: str = "url", require_https: bool = False ) -> str: value = Validators.required(value, field_name).strip() url_pattern = re.compile( r'^https?://' r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' r'localhost|' r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' r'(?::\d+)?' r'(?:/?|[/?]\S+)$', re.IGNORECASE ) if not url_pattern.match(value): raise InputValidationError( field=field_name, message="Invalid URL format", value=value, expected_type="url" ) if require_https and not value.startswith('https://'): raise InputValidationError( field=field_name, message="URL must use HTTPS", value=value ) return value @staticmethod def enum( value: Any, field_name: str, allowed_values: List[Any] ) -> Any: if value not in allowed_values: raise InputValidationError( field=field_name, message=f"Must be one of: {', '.join(map(str, allowed_values))}", value=value ) return value def validate(rules: Dict[str, Dict[str, Any]]) -> Callable: def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): data = request.get_json() if request.is_json else request.form validated_data = {} for field_name, field_rules in rules.items(): value = data.get(field_name) if 'required' in field_rules and field_rules['required']: value = Validators.required(value, field_name) elif value is None or value == '': if 'default' in field_rules: validated_data[field_name] = field_rules['default'] continue validator_name = field_rules.get('type', 'string') validator_func = getattr(Validators, validator_name, None) if not validator_func: raise ValueError(f"Unknown validator type: {validator_name}") validator_params = { k: v for k, v in field_rules.items() if k not in ['type', 'required', 'default'] } validator_params['field_name'] = field_name validated_data[field_name] = validator_func(value, **validator_params) request.validated_data = validated_data return func(*args, **kwargs) return wrapper return decorator def sanitize_html(value: str) -> str: dangerous_tags = re.compile( r'<(script|iframe|object|embed|form|input|button|textarea|select|link|meta|style).*?>.*?', re.IGNORECASE | re.DOTALL ) dangerous_attrs = re.compile( r'\s*(on\w+|style|javascript:)[\s]*=[\s]*["\']?[^"\'>\s]+', re.IGNORECASE ) value = dangerous_tags.sub('', value) value = dangerous_attrs.sub('', value) return value def sanitize_sql_identifier(value: str) -> str: if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', value): raise ValidationException( message="Invalid SQL identifier", details={'value': value}, user_message="Ungültiger Bezeichner" ) return value