import os import time import gzip import logging import subprocess from pathlib import Path from datetime import datetime from zoneinfo import ZoneInfo from cryptography.fernet import Fernet from db import get_db_connection, get_db_cursor from config import BACKUP_DIR, DATABASE_CONFIG, EMAIL_ENABLED, BACKUP_ENCRYPTION_KEY from utils.audit import log_audit logger = logging.getLogger(__name__) def get_or_create_encryption_key(): """Get or create an encryption key""" key_file = BACKUP_DIR / ".backup_key" # Try to read key from environment variable if BACKUP_ENCRYPTION_KEY: try: # Validate the key Fernet(BACKUP_ENCRYPTION_KEY.encode()) return BACKUP_ENCRYPTION_KEY.encode() except: pass # If no valid key in ENV, check file if key_file.exists(): return key_file.read_bytes() # Create new key key = Fernet.generate_key() key_file.write_bytes(key) logger.info("New backup encryption key created") return key def create_backup(backup_type="manual", created_by=None): """Create an encrypted database backup""" start_time = time.time() timestamp = datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S") filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc" filepath = BACKUP_DIR / filename with get_db_connection() as conn: with get_db_cursor(conn) as cur: # Create backup entry cur.execute(""" INSERT INTO backup_history (filename, filepath, backup_type, status, created_by, is_encrypted) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, (filename, str(filepath), backup_type, 'in_progress', created_by or 'system', True)) backup_id = cur.fetchone()[0] conn.commit() try: # PostgreSQL dump command dump_command = [ 'pg_dump', '-h', DATABASE_CONFIG['host'], '-p', DATABASE_CONFIG['port'], '-U', DATABASE_CONFIG['user'], '-d', DATABASE_CONFIG['dbname'], '--no-password', '--verbose' ] # Set PGPASSWORD env = os.environ.copy() env['PGPASSWORD'] = DATABASE_CONFIG['password'] # Execute dump result = subprocess.run(dump_command, capture_output=True, text=True, env=env) if result.returncode != 0: raise Exception(f"pg_dump failed: {result.stderr}") dump_data = result.stdout.encode('utf-8') # Compress compressed_data = gzip.compress(dump_data) # Encrypt key = get_or_create_encryption_key() f = Fernet(key) encrypted_data = f.encrypt(compressed_data) # Save filepath.write_bytes(encrypted_data) # Collect statistics with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'") tables_count = cur.fetchone()[0] cur.execute("SELECT SUM(n_live_tup) FROM pg_stat_user_tables") records_count = cur.fetchone()[0] or 0 duration = time.time() - start_time filesize = filepath.stat().st_size # Update backup entry with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute(""" UPDATE backup_history SET status = %s, filesize = %s, tables_count = %s, records_count = %s, duration_seconds = %s WHERE id = %s """, ('success', filesize, tables_count, records_count, duration, backup_id)) conn.commit() # Audit log log_audit('BACKUP', 'database', backup_id, additional_info=f"Backup created: {filename} ({filesize} bytes)") # Email notification (if configured) send_backup_notification(True, filename, filesize, duration) logger.info(f"Backup successfully created: {filename}") return True, filename except Exception as e: # Log error with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute(""" UPDATE backup_history SET status = %s, error_message = %s, duration_seconds = %s WHERE id = %s """, ('failed', str(e), time.time() - start_time, backup_id)) conn.commit() logger.error(f"Backup failed: {e}") send_backup_notification(False, filename, error=str(e)) return False, str(e) def restore_backup(backup_id, encryption_key=None): """Restore a backup""" with get_db_connection() as conn: with get_db_cursor(conn) as cur: # Get backup info cur.execute(""" SELECT filename, filepath, is_encrypted FROM backup_history WHERE id = %s """, (backup_id,)) backup_info = cur.fetchone() if not backup_info: raise Exception("Backup not found") filename, filepath, is_encrypted = backup_info filepath = Path(filepath) if not filepath.exists(): raise Exception("Backup file not found") try: # Read file encrypted_data = filepath.read_bytes() # Decrypt if is_encrypted: key = encryption_key.encode() if encryption_key else get_or_create_encryption_key() try: f = Fernet(key) compressed_data = f.decrypt(encrypted_data) except: raise Exception("Decryption failed. Wrong password?") else: compressed_data = encrypted_data # Decompress dump_data = gzip.decompress(compressed_data) sql_commands = dump_data.decode('utf-8') # Restore database restore_command = [ 'psql', '-h', DATABASE_CONFIG['host'], '-p', DATABASE_CONFIG['port'], '-U', DATABASE_CONFIG['user'], '-d', DATABASE_CONFIG['dbname'], '--no-password' ] env = os.environ.copy() env['PGPASSWORD'] = DATABASE_CONFIG['password'] result = subprocess.run(restore_command, input=sql_commands, capture_output=True, text=True, env=env) if result.returncode != 0: raise Exception(f"Restore failed: {result.stderr}") # Audit log log_audit('RESTORE', 'database', backup_id, additional_info=f"Backup restored: {filename}") return True, "Backup successfully restored" except Exception as e: logger.error(f"Restore failed: {e}") return False, str(e) def send_backup_notification(success, filename, filesize=None, duration=None, error=None): """Send email notification (if configured)""" if not EMAIL_ENABLED: return # Email function prepared but disabled # TODO: Implement when email server is configured logger.info(f"Email notification prepared: Backup {'successful' if success else 'failed'}")