Files
AccountForger-neuerUpload/infrastructure/repositories/base_repository.py
Claude Project Manager 04585e95b6 Initial commit
2025-08-01 23:50:28 +02:00

112 Zeilen
3.9 KiB
Python

"""
Base Repository - Abstrakte Basis für alle Repositories
"""
import sqlite3
import json
import logging
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
from contextlib import contextmanager
from config.paths import PathConfig
logger = logging.getLogger("base_repository")
class BaseRepository:
"""Basis-Repository mit gemeinsamen Datenbankfunktionen"""
def __init__(self, db_path: str = None):
"""
Initialisiert das Repository.
Args:
db_path: Pfad zur Datenbank (falls None, wird PathConfig.MAIN_DB verwendet)
"""
self.db_path = db_path if db_path is not None else PathConfig.MAIN_DB
self._ensure_schema()
def _ensure_schema(self):
"""Stellt sicher dass das erweiterte Schema existiert"""
try:
if PathConfig.file_exists(PathConfig.SCHEMA_V2):
with open(PathConfig.SCHEMA_V2, "r", encoding='utf-8') as f:
schema_sql = f.read()
with self.get_connection() as conn:
conn.executescript(schema_sql)
conn.commit()
logger.info("Schema v2 erfolgreich geladen")
else:
logger.warning(f"schema_v2.sql nicht gefunden unter {PathConfig.SCHEMA_V2}, nutze existierendes Schema")
except Exception as e:
logger.error(f"Fehler beim Schema-Update: {e}")
@contextmanager
def get_connection(self):
"""Context Manager für Datenbankverbindungen"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def _serialize_json(self, data: Any) -> str:
"""Serialisiert Daten zu JSON"""
if data is None:
return None
return json.dumps(data, default=str)
def _deserialize_json(self, data: str) -> Any:
"""Deserialisiert JSON zu Python-Objekten"""
if data is None:
return None
try:
return json.loads(data)
except json.JSONDecodeError:
logger.error(f"Fehler beim JSON-Parsing: {data}")
return None
def _parse_datetime(self, dt_string: str) -> Optional[datetime]:
"""Parst einen Datetime-String"""
if not dt_string:
return None
try:
# SQLite datetime format
return datetime.strptime(dt_string, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
# ISO format
return datetime.fromisoformat(dt_string.replace('Z', '+00:00'))
except:
logger.error(f"Konnte Datetime nicht parsen: {dt_string}")
return None
def _execute_query(self, query: str, params: tuple = ()) -> List[sqlite3.Row]:
"""Führt eine SELECT-Query aus"""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
return cursor.fetchall()
def _execute_insert(self, query: str, params: tuple = ()) -> int:
"""Führt eine INSERT-Query aus und gibt die ID zurück"""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
conn.commit()
return cursor.lastrowid
def _execute_update(self, query: str, params: tuple = ()) -> int:
"""Führt eine UPDATE-Query aus und gibt affected rows zurück"""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
conn.commit()
return cursor.rowcount
def _execute_delete(self, query: str, params: tuple = ()) -> int:
"""Führt eine DELETE-Query aus und gibt affected rows zurück"""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
conn.commit()
return cursor.rowcount