import logging import logging.handlers import json import sys import os from datetime import datetime from typing import Dict, Any from flask import g, request, has_request_context class StructuredFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': datetime.utcnow().isoformat(), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno } if has_request_context(): log_data['request'] = { 'method': request.method, 'path': request.path, 'remote_addr': request.remote_addr, 'user_agent': request.user_agent.string, 'request_id': getattr(g, 'request_id', 'unknown') } if hasattr(record, 'request_id'): log_data['request_id'] = record.request_id if hasattr(record, 'error_code'): log_data['error_code'] = record.error_code if hasattr(record, 'details') and record.details: log_data['details'] = self._sanitize_details(record.details) if record.exc_info: log_data['exception'] = { 'type': record.exc_info[0].__name__, 'message': str(record.exc_info[1]), 'traceback': self.formatException(record.exc_info) } return json.dumps(log_data, ensure_ascii=False) def _sanitize_details(self, details: Dict[str, Any]) -> Dict[str, Any]: sensitive_fields = { 'password', 'secret', 'token', 'api_key', 'authorization', 'credit_card', 'ssn', 'pin' } sanitized = {} for key, value in details.items(): if any(field in key.lower() for field in sensitive_fields): sanitized[key] = '[REDACTED]' elif isinstance(value, dict): sanitized[key] = self._sanitize_details(value) else: sanitized[key] = value return sanitized class ErrorLevelFilter(logging.Filter): def __init__(self, min_level=logging.ERROR): self.min_level = min_level def filter(self, record): return record.levelno >= self.min_level def setup_logging(app): log_level = os.getenv('LOG_LEVEL', 'INFO').upper() log_dir = os.getenv('LOG_DIR', 'logs') os.makedirs(log_dir, exist_ok=True) root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level)) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) if app.debug: console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) else: console_formatter = StructuredFormatter() console_handler.setFormatter(console_formatter) root_logger.addHandler(console_handler) app_log_handler = logging.handlers.RotatingFileHandler( os.path.join(log_dir, 'app.log'), maxBytes=10 * 1024 * 1024, backupCount=10 ) app_log_handler.setLevel(logging.DEBUG) app_log_handler.setFormatter(StructuredFormatter()) root_logger.addHandler(app_log_handler) error_log_handler = logging.handlers.RotatingFileHandler( os.path.join(log_dir, 'errors.log'), maxBytes=10 * 1024 * 1024, backupCount=10 ) error_log_handler.setLevel(logging.ERROR) error_log_handler.setFormatter(StructuredFormatter()) error_log_handler.addFilter(ErrorLevelFilter()) root_logger.addHandler(error_log_handler) security_logger = logging.getLogger('security') security_handler = logging.handlers.RotatingFileHandler( os.path.join(log_dir, 'security.log'), maxBytes=10 * 1024 * 1024, backupCount=20 ) security_handler.setFormatter(StructuredFormatter()) security_logger.addHandler(security_handler) security_logger.setLevel(logging.INFO) werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.WARNING) @app.before_request def log_request_info(): logger = logging.getLogger('request') logger.info( 'Request started', extra={ 'request_id': getattr(g, 'request_id', 'unknown'), 'details': { 'method': request.method, 'path': request.path, 'query_params': dict(request.args), 'content_length': request.content_length } } ) @app.after_request def log_response_info(response): logger = logging.getLogger('request') logger.info( 'Request completed', extra={ 'request_id': getattr(g, 'request_id', 'unknown'), 'details': { 'status_code': response.status_code, 'content_length': response.content_length or 0 } } ) return response def get_logger(name: str) -> logging.Logger: return logging.getLogger(name) def log_error(logger: logging.Logger, message: str, error: Exception = None, **kwargs): extra = kwargs.copy() if error: extra['error_type'] = type(error).__name__ extra['error_message'] = str(error) if hasattr(error, 'code'): extra['error_code'] = error.code if hasattr(error, 'details'): extra['details'] = error.details logger.error(message, exc_info=error, extra=extra) def log_security_event(event_type: str, message: str, **details): logger = logging.getLogger('security') logger.warning( f"Security Event: {event_type} - {message}", extra={ 'security_event': event_type, 'details': details, 'request_id': getattr(g, 'request_id', 'unknown') if has_request_context() else None } )