181 Zeilen
6.6 KiB
Python
181 Zeilen
6.6 KiB
Python
import os
|
|
import subprocess
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class GitHubBackupManager:
|
|
def __init__(self):
|
|
self.repo_path = Path("/opt/v2-Docker")
|
|
self.backup_remote = "backup"
|
|
self.git_lfs_path = "/home/root/.local/bin"
|
|
|
|
def _run_git_command(self, cmd, cwd=None):
|
|
"""Execute git command with proper PATH"""
|
|
env = os.environ.copy()
|
|
env['PATH'] = f"{self.git_lfs_path}:{env['PATH']}"
|
|
|
|
if cwd is None:
|
|
cwd = self.repo_path
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=cwd,
|
|
env=env
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"Git command failed: {cmd}")
|
|
logger.error(f"Error: {result.stderr}")
|
|
return False, result.stderr
|
|
|
|
return True, result.stdout
|
|
|
|
except Exception as e:
|
|
logger.error(f"Git command exception: {str(e)}")
|
|
return False, str(e)
|
|
|
|
def pull_latest(self):
|
|
"""Pull latest from backup repository"""
|
|
success, output = self._run_git_command(f"git pull {self.backup_remote} main --rebase")
|
|
return success, output
|
|
|
|
def push_backup(self, file_path, backup_type="database"):
|
|
"""Push backup file to GitHub"""
|
|
try:
|
|
# First pull to avoid conflicts
|
|
success, output = self.pull_latest()
|
|
if not success and "CONFLICT" not in output:
|
|
logger.warning(f"Pull failed but continuing: {output}")
|
|
|
|
# Add file
|
|
relative_path = Path(file_path).relative_to(self.repo_path)
|
|
success, output = self._run_git_command(f"git add {relative_path}")
|
|
if not success:
|
|
return False, f"Failed to add file: {output}"
|
|
|
|
# Commit
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
commit_msg = f"Backup {backup_type} - {timestamp}"
|
|
success, output = self._run_git_command(f'git commit -m "{commit_msg}"')
|
|
if not success:
|
|
return False, f"Failed to commit: {output}"
|
|
|
|
# Push
|
|
success, output = self._run_git_command(f"git push {self.backup_remote} main")
|
|
if not success:
|
|
# Try force push if normal push fails
|
|
success, output = self._run_git_command(f"git push {self.backup_remote} main --force-with-lease")
|
|
if not success:
|
|
return False, f"Failed to push: {output}"
|
|
|
|
return True, "Backup successfully pushed to GitHub"
|
|
|
|
except Exception as e:
|
|
logger.error(f"GitHub push error: {str(e)}")
|
|
return False, str(e)
|
|
|
|
def list_github_backups(self, backup_type="all"):
|
|
"""List backups from GitHub repository"""
|
|
try:
|
|
# Fetch latest
|
|
self._run_git_command(f"git fetch {self.backup_remote}")
|
|
|
|
# List files in backup directories
|
|
backups = []
|
|
|
|
if backup_type in ["all", "database"]:
|
|
db_path = "database-backups"
|
|
success, output = self._run_git_command(f"git ls-tree -r {self.backup_remote}/main --name-only | grep '^{db_path}/'")
|
|
if success and output:
|
|
for file in output.strip().split('\n'):
|
|
if file:
|
|
backups.append({
|
|
'type': 'database',
|
|
'path': file,
|
|
'filename': os.path.basename(file)
|
|
})
|
|
|
|
if backup_type in ["all", "server"]:
|
|
server_path = "server-backups"
|
|
success, output = self._run_git_command(f"git ls-tree -r {self.backup_remote}/main --name-only | grep '^{server_path}/'")
|
|
if success and output:
|
|
for file in output.strip().split('\n'):
|
|
if file:
|
|
backups.append({
|
|
'type': 'server',
|
|
'path': file,
|
|
'filename': os.path.basename(file)
|
|
})
|
|
|
|
return True, backups
|
|
|
|
except Exception as e:
|
|
logger.error(f"List GitHub backups error: {str(e)}")
|
|
return False, str(e)
|
|
|
|
def download_from_github(self, file_path, local_path):
|
|
"""Download a backup file from GitHub"""
|
|
try:
|
|
# Fetch latest
|
|
self._run_git_command(f"git fetch {self.backup_remote}")
|
|
|
|
# Get file from remote
|
|
success, output = self._run_git_command(
|
|
f"git show {self.backup_remote}/main:{file_path} > {local_path}"
|
|
)
|
|
|
|
if not success:
|
|
return False, f"Failed to download: {output}"
|
|
|
|
return True, local_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Download from GitHub error: {str(e)}")
|
|
return False, str(e)
|
|
|
|
def create_server_backup(created_by="system"):
|
|
"""Create a full server backup"""
|
|
try:
|
|
# Run the backup script
|
|
backup_script = Path("/opt/v2-Docker/create_full_backup.sh")
|
|
if not backup_script.exists():
|
|
return False, "Backup script not found"
|
|
|
|
env = os.environ.copy()
|
|
env['PATH'] = f"/home/root/.local/bin:{env['PATH']}"
|
|
|
|
result = subprocess.run(
|
|
["./create_full_backup.sh"],
|
|
cwd="/opt/v2-Docker",
|
|
capture_output=True,
|
|
text=True,
|
|
env=env
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return False, f"Backup script failed: {result.stderr}"
|
|
|
|
# Extract backup file path from output
|
|
output_lines = result.stdout.strip().split('\n')
|
|
backup_file = None
|
|
for line in output_lines:
|
|
if "Backup file:" in line:
|
|
backup_file = line.split("Backup file:")[1].strip()
|
|
break
|
|
|
|
if not backup_file:
|
|
return False, "Could not determine backup file path"
|
|
|
|
return True, backup_file
|
|
|
|
except Exception as e:
|
|
logger.error(f"Server backup error: {str(e)}")
|
|
return False, str(e) |