Dateien
Hetzner-Backup/v2_adminpanel/fix_database_fields.py
2025-06-18 19:40:14 +02:00

178 Zeilen
6.3 KiB
Python

#!/usr/bin/env python3
"""
Script to fix database field name inconsistencies in the v2_adminpanel codebase.
This script identifies and optionally fixes incorrect field references.
"""
import os
import re
from typing import Dict, List, Tuple
import argparse
# Define the field mappings (incorrect -> correct)
FIELD_MAPPINGS = {
# Sessions table
'device_id': 'hardware_id',
'active': 'is_active',
'login_time': 'started_at',
'last_activity': 'last_heartbeat',
'logout_time': 'ended_at',
'start_time': 'started_at'
}
# Files to check
FILES_TO_CHECK = [
'routes/session_routes.py',
'routes/api_routes.py',
'routes/export_routes.py',
'routes/batch_routes.py',
'routes/customer_routes.py',
'models.py'
]
# Patterns to identify database field usage
PATTERNS = [
# SQL queries
(r'SELECT\s+.*?(\w+).*?FROM\s+sessions', 'SQL SELECT'),
(r'WHERE\s+.*?(\w+)\s*=', 'SQL WHERE'),
(r'SET\s+(\w+)\s*=', 'SQL SET'),
(r'ORDER\s+BY\s+.*?(\w+)', 'SQL ORDER BY'),
# Dictionary/JSON access
(r'\[[\'"](device_id|active|login_time|last_activity|logout_time|start_time)[\'"]\]', 'Dict access'),
(r'\.get\([\'"](device_id|active|login_time|last_activity|logout_time|start_time)[\'"]', 'Dict get'),
# Row access patterns
(r'row\[\d+\]\s*#.*?(device_id|active|login_time|last_activity|logout_time|start_time)', 'Row comment'),
# Column references in queries
(r's\.(device_id|active|login_time|last_activity|logout_time|start_time)', 'Table alias')
]
def find_field_usage(file_path: str) -> List[Tuple[int, str, str, str]]:
"""Find all incorrect field usages in a file."""
issues = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
for field_old, field_new in FIELD_MAPPINGS.items():
# Simple pattern matching for field names
if field_old in line:
# Check if it's in a string or actual code
if f"'{field_old}'" in line or f'"{field_old}"' in line:
issues.append((line_num, line.strip(), field_old, field_new))
elif f'.{field_old}' in line or f' {field_old} ' in line:
issues.append((line_num, line.strip(), field_old, field_new))
elif f'[{field_old}]' in line:
issues.append((line_num, line.strip(), field_old, field_new))
except Exception as e:
print(f"Error reading {file_path}: {e}")
return issues
def generate_fix_report(base_path: str) -> Dict[str, List[Tuple[int, str, str, str]]]:
"""Generate a report of all field usage issues."""
report = {}
for file_name in FILES_TO_CHECK:
file_path = os.path.join(base_path, file_name)
if os.path.exists(file_path):
issues = find_field_usage(file_path)
if issues:
report[file_name] = issues
return report
def apply_fixes(base_path: str, report: Dict[str, List[Tuple[int, str, str, str]]], dry_run: bool = True):
"""Apply fixes to the files."""
for file_name, issues in report.items():
file_path = os.path.join(base_path, file_name)
if dry_run:
print(f"\n--- DRY RUN: Would fix {file_name} ---")
for line_num, line, old_field, new_field in issues:
print(f" Line {line_num}: {old_field} -> {new_field}")
continue
# Read the file
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Apply replacements
original_content = content
replacements_made = 0
for old_field, new_field in FIELD_MAPPINGS.items():
# Replace in strings
patterns = [
(f"'{old_field}'", f"'{new_field}'"),
(f'"{old_field}"', f'"{new_field}"'),
(f'.{old_field}', f'.{new_field}'),
(f' {old_field} ', f' {new_field} '),
(f' {old_field},', f' {new_field},'),
(f' {old_field}\n', f' {new_field}\n'),
]
for pattern_old, pattern_new in patterns:
if pattern_old in content:
content = content.replace(pattern_old, pattern_new)
replacements_made += 1
# Write back only if changes were made
if content != original_content:
# Create backup
backup_path = f"{file_path}.backup"
with open(backup_path, 'w', encoding='utf-8') as f:
f.write(original_content)
# Write fixed content
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"\nFixed {file_name} ({replacements_made} replacements made)")
print(f" Backup saved to: {backup_path}")
def main():
parser = argparse.ArgumentParser(description='Fix database field name inconsistencies')
parser.add_argument('--apply', action='store_true', help='Apply fixes (default is dry run)')
parser.add_argument('--path', default='.', help='Base path of the project')
args = parser.parse_args()
print("Database Field Name Fixer")
print("=" * 50)
# Generate report
report = generate_fix_report(args.path)
if not report:
print("No issues found!")
return
# Display report
total_issues = 0
for file_name, issues in report.items():
print(f"\n{file_name}: {len(issues)} issues")
total_issues += len(issues)
for line_num, line, old_field, new_field in issues[:5]: # Show first 5
print(f" Line {line_num}: {old_field} -> {new_field}")
print(f" {line[:80]}...")
if len(issues) > 5:
print(f" ... and {len(issues) - 5} more")
print(f"\nTotal issues found: {total_issues}")
# Apply fixes if requested
if args.apply:
print("\nApplying fixes...")
apply_fixes(args.path, report, dry_run=False)
print("\nFixes applied! Please test all functionality.")
else:
print("\nRun with --apply to fix these issues.")
apply_fixes(args.path, report, dry_run=True)
if __name__ == '__main__':
main()