Files
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

223 Zeilen
7.4 KiB
Python

import os
import time
import gzip
import logging
import subprocess
from pathlib import Path
from datetime import datetime
from zoneinfo import ZoneInfo
from cryptography.fernet import Fernet
from db import get_db_connection, get_db_cursor
from config import BACKUP_DIR, DATABASE_CONFIG, EMAIL_ENABLED, BACKUP_ENCRYPTION_KEY
from utils.audit import log_audit
logger = logging.getLogger(__name__)
def get_or_create_encryption_key():
"""Get or create an encryption key"""
key_file = BACKUP_DIR / ".backup_key"
# Try to read key from environment variable
if BACKUP_ENCRYPTION_KEY:
try:
# Validate the key
Fernet(BACKUP_ENCRYPTION_KEY.encode())
return BACKUP_ENCRYPTION_KEY.encode()
except:
pass
# If no valid key in ENV, check file
if key_file.exists():
return key_file.read_bytes()
# Create new key
key = Fernet.generate_key()
key_file.write_bytes(key)
logger.info("New backup encryption key created")
return key
def create_backup(backup_type="manual", created_by=None):
"""Create an encrypted database backup"""
start_time = time.time()
timestamp = datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S")
filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc"
filepath = BACKUP_DIR / filename
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
# Create backup entry
cur.execute("""
INSERT INTO backup_history
(filename, filepath, backup_type, status, created_by, is_encrypted)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (filename, str(filepath), backup_type, 'in_progress',
created_by or 'system', True))
backup_id = cur.fetchone()[0]
conn.commit()
try:
# PostgreSQL dump command
dump_command = [
'pg_dump',
'-h', DATABASE_CONFIG['host'],
'-p', DATABASE_CONFIG['port'],
'-U', DATABASE_CONFIG['user'],
'-d', DATABASE_CONFIG['dbname'],
'--no-password',
'--verbose'
]
# Set PGPASSWORD
env = os.environ.copy()
env['PGPASSWORD'] = DATABASE_CONFIG['password']
# Execute dump
result = subprocess.run(dump_command, capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
dump_data = result.stdout.encode('utf-8')
# Compress
compressed_data = gzip.compress(dump_data)
# Encrypt
key = get_or_create_encryption_key()
f = Fernet(key)
encrypted_data = f.encrypt(compressed_data)
# Save
filepath.write_bytes(encrypted_data)
# Collect statistics
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'")
tables_count = cur.fetchone()[0]
cur.execute("SELECT SUM(n_live_tup) FROM pg_stat_user_tables")
records_count = cur.fetchone()[0] or 0
duration = time.time() - start_time
filesize = filepath.stat().st_size
# Update backup entry
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
UPDATE backup_history
SET status = %s, filesize = %s, tables_count = %s,
records_count = %s, duration_seconds = %s
WHERE id = %s
""", ('success', filesize, tables_count, records_count, duration, backup_id))
conn.commit()
# Audit log
log_audit('BACKUP', 'database', backup_id,
additional_info=f"Backup created: {filename} ({filesize} bytes)")
# Email notification (if configured)
send_backup_notification(True, filename, filesize, duration)
logger.info(f"Backup successfully created: {filename}")
return True, filename
except Exception as e:
# Log error
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
cur.execute("""
UPDATE backup_history
SET status = %s, error_message = %s, duration_seconds = %s
WHERE id = %s
""", ('failed', str(e), time.time() - start_time, backup_id))
conn.commit()
logger.error(f"Backup failed: {e}")
send_backup_notification(False, filename, error=str(e))
return False, str(e)
def restore_backup(backup_id, encryption_key=None):
"""Restore a backup"""
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
# Get backup info
cur.execute("""
SELECT filename, filepath, is_encrypted
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
if not backup_info:
raise Exception("Backup not found")
filename, filepath, is_encrypted = backup_info
filepath = Path(filepath)
if not filepath.exists():
raise Exception("Backup file not found")
try:
# Read file
encrypted_data = filepath.read_bytes()
# Decrypt
if is_encrypted:
key = encryption_key.encode() if encryption_key else get_or_create_encryption_key()
try:
f = Fernet(key)
compressed_data = f.decrypt(encrypted_data)
except:
raise Exception("Decryption failed. Wrong password?")
else:
compressed_data = encrypted_data
# Decompress
dump_data = gzip.decompress(compressed_data)
sql_commands = dump_data.decode('utf-8')
# Restore database
restore_command = [
'psql',
'-h', DATABASE_CONFIG['host'],
'-p', DATABASE_CONFIG['port'],
'-U', DATABASE_CONFIG['user'],
'-d', DATABASE_CONFIG['dbname'],
'--no-password'
]
env = os.environ.copy()
env['PGPASSWORD'] = DATABASE_CONFIG['password']
result = subprocess.run(restore_command, input=sql_commands,
capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"Restore failed: {result.stderr}")
# Audit log
log_audit('RESTORE', 'database', backup_id,
additional_info=f"Backup restored: {filename}")
return True, "Backup successfully restored"
except Exception as e:
logger.error(f"Restore failed: {e}")
return False, str(e)
def send_backup_notification(success, filename, filesize=None, duration=None, error=None):
"""Send email notification (if configured)"""
if not EMAIL_ENABLED:
return
# Email function prepared but disabled
# TODO: Implement when email server is configured
logger.info(f"Email notification prepared: Backup {'successful' if success else 'failed'}")