170 Zeilen
5.9 KiB
Python
170 Zeilen
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script to find and fix database field name inconsistencies in Python code
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Dict
|
|
|
|
# Field mappings (old_name -> new_name)
|
|
FIELD_MAPPINGS = {
|
|
'device_id': 'hardware_id',
|
|
'active': 'is_active',
|
|
'login_time': 'started_at',
|
|
'last_activity': 'last_heartbeat',
|
|
'logout_time': 'ended_at',
|
|
'start_time': 'started_at' # Found in models.py
|
|
}
|
|
|
|
# Patterns to identify database queries
|
|
QUERY_PATTERNS = [
|
|
r'SELECT.*FROM\s+sessions',
|
|
r'UPDATE\s+sessions',
|
|
r'INSERT\s+INTO\s+sessions',
|
|
r'WHERE.*sessions\.',
|
|
r's\.\w+', # Table alias pattern
|
|
r'row\[\d+\]', # Row index access
|
|
]
|
|
|
|
def find_python_files(directory: Path) -> List[Path]:
|
|
"""Find all Python files in directory"""
|
|
return list(directory.rglob("*.py"))
|
|
|
|
def check_file_for_inconsistencies(filepath: Path) -> Dict[str, List[Tuple[int, str]]]:
|
|
"""Check a file for field name inconsistencies"""
|
|
inconsistencies = {}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
for line_num, line in enumerate(lines, 1):
|
|
# Skip comments
|
|
if line.strip().startswith('#'):
|
|
continue
|
|
|
|
# Check for old field names
|
|
for old_field, new_field in FIELD_MAPPINGS.items():
|
|
# Look for field references in various contexts
|
|
patterns = [
|
|
rf'\b{old_field}\b', # Word boundary
|
|
rf'["\']{{1}}{old_field}["\']{{1}}', # In quotes
|
|
rf's\.{old_field}\b', # Table alias
|
|
rf'row\[.*{old_field}.*\]', # In row access
|
|
]
|
|
|
|
for pattern in patterns:
|
|
if re.search(pattern, line, re.IGNORECASE):
|
|
# Check if it's in a database query context
|
|
is_db_context = any(re.search(qp, line, re.IGNORECASE) for qp in QUERY_PATTERNS)
|
|
|
|
# Also check previous lines for query context
|
|
if not is_db_context and line_num > 1:
|
|
for i in range(max(0, line_num - 5), line_num):
|
|
if any(re.search(qp, lines[i], re.IGNORECASE) for qp in QUERY_PATTERNS):
|
|
is_db_context = True
|
|
break
|
|
|
|
if is_db_context or 'cur.execute' in line or 'execute_query' in line:
|
|
if old_field not in inconsistencies:
|
|
inconsistencies[old_field] = []
|
|
inconsistencies[old_field].append((line_num, line.strip()))
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {filepath}: {e}")
|
|
|
|
return inconsistencies
|
|
|
|
def generate_fix_suggestions(inconsistencies: Dict[Path, Dict[str, List[Tuple[int, str]]]]) -> None:
|
|
"""Generate fix suggestions for found inconsistencies"""
|
|
print("\n" + "="*80)
|
|
print("FIELD NAME INCONSISTENCIES FOUND")
|
|
print("="*80 + "\n")
|
|
|
|
total_issues = 0
|
|
|
|
for filepath, file_issues in inconsistencies.items():
|
|
if not file_issues:
|
|
continue
|
|
|
|
print(f"\n📄 {filepath}")
|
|
print("-" * 80)
|
|
|
|
for old_field, occurrences in file_issues.items():
|
|
new_field = FIELD_MAPPINGS[old_field]
|
|
print(f"\n ❌ Found '{old_field}' (should be '{new_field}'):")
|
|
|
|
for line_num, line_content in occurrences:
|
|
print(f" Line {line_num}: {line_content[:100]}...")
|
|
total_issues += 1
|
|
|
|
print(f"\n\n📊 Total issues found: {total_issues}")
|
|
print("\n" + "="*80)
|
|
print("RECOMMENDED FIXES")
|
|
print("="*80 + "\n")
|
|
|
|
for old_field, new_field in FIELD_MAPPINGS.items():
|
|
print(f" • Replace '{old_field}' with '{new_field}'")
|
|
|
|
print("\n⚠️ Note: Review each change carefully, as some occurrences might not be database-related")
|
|
|
|
def create_compatibility_queries() -> None:
|
|
"""Generate SQL queries for creating compatibility views"""
|
|
print("\n" + "="*80)
|
|
print("COMPATIBILITY SQL QUERIES")
|
|
print("="*80 + "\n")
|
|
|
|
print("-- Use this view during migration:")
|
|
print("CREATE OR REPLACE VIEW sessions_compat AS")
|
|
print("SELECT ")
|
|
print(" *,")
|
|
for old_field, new_field in FIELD_MAPPINGS.items():
|
|
if old_field != 'start_time': # Skip non-column mappings
|
|
print(f" {new_field} as {old_field},")
|
|
print("FROM sessions;\n")
|
|
|
|
def main():
|
|
"""Main function"""
|
|
# Get the v2_adminpanel directory
|
|
base_dir = Path(__file__).parent
|
|
|
|
print(f"🔍 Scanning directory: {base_dir}")
|
|
|
|
# Find all Python files
|
|
python_files = find_python_files(base_dir)
|
|
print(f"📁 Found {len(python_files)} Python files")
|
|
|
|
# Check each file for inconsistencies
|
|
all_inconsistencies = {}
|
|
|
|
for filepath in python_files:
|
|
# Skip this script and migration files
|
|
if filepath.name in ['fix_field_references.py', '__pycache__']:
|
|
continue
|
|
|
|
inconsistencies = check_file_for_inconsistencies(filepath)
|
|
if inconsistencies:
|
|
all_inconsistencies[filepath] = inconsistencies
|
|
|
|
# Generate report
|
|
generate_fix_suggestions(all_inconsistencies)
|
|
|
|
# Generate compatibility queries
|
|
create_compatibility_queries()
|
|
|
|
# Summary of affected files
|
|
print("\n" + "="*80)
|
|
print("AFFECTED FILES SUMMARY")
|
|
print("="*80 + "\n")
|
|
|
|
affected_files = [str(f.relative_to(base_dir)) for f in all_inconsistencies.keys()]
|
|
for i, filepath in enumerate(sorted(affected_files), 1):
|
|
print(f" {i}. {filepath}")
|
|
|
|
print(f"\n✅ Scan complete! Found issues in {len(affected_files)} files.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |