""" Base Repository - Abstrakte Basis für alle Repositories """ import sqlite3 import json import logging from typing import Dict, List, Any, Optional, Union from datetime import datetime from contextlib import contextmanager from config.paths import PathConfig logger = logging.getLogger("base_repository") class BaseRepository: """Basis-Repository mit gemeinsamen Datenbankfunktionen""" def __init__(self, db_path: str = None): """ Initialisiert das Repository. Args: db_path: Pfad zur Datenbank (falls None, wird PathConfig.MAIN_DB verwendet) """ self.db_path = db_path if db_path is not None else PathConfig.MAIN_DB self._ensure_schema() def _ensure_schema(self): """Stellt sicher dass das erweiterte Schema existiert""" try: if PathConfig.file_exists(PathConfig.SCHEMA_V2): with open(PathConfig.SCHEMA_V2, "r", encoding='utf-8') as f: schema_sql = f.read() with self.get_connection() as conn: conn.executescript(schema_sql) conn.commit() logger.info("Schema v2 erfolgreich geladen") else: logger.warning(f"schema_v2.sql nicht gefunden unter {PathConfig.SCHEMA_V2}, nutze existierendes Schema") except Exception as e: logger.error(f"Fehler beim Schema-Update: {e}") @contextmanager def get_connection(self): """Context Manager für Datenbankverbindungen""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def _serialize_json(self, data: Any) -> str: """Serialisiert Daten zu JSON""" if data is None: return None return json.dumps(data, default=str) def _deserialize_json(self, data: str) -> Any: """Deserialisiert JSON zu Python-Objekten""" if data is None: return None try: return json.loads(data) except json.JSONDecodeError: logger.error(f"Fehler beim JSON-Parsing: {data}") return None def _parse_datetime(self, dt_string: str) -> Optional[datetime]: """Parst einen Datetime-String""" if not dt_string: return None try: # SQLite datetime format return datetime.strptime(dt_string, "%Y-%m-%d %H:%M:%S") except ValueError: try: # ISO format return datetime.fromisoformat(dt_string.replace('Z', '+00:00')) except: logger.error(f"Konnte Datetime nicht parsen: {dt_string}") return None def _execute_query(self, query: str, params: tuple = ()) -> List[sqlite3.Row]: """Führt eine SELECT-Query aus""" with self.get_connection() as conn: cursor = conn.execute(query, params) return cursor.fetchall() def _execute_insert(self, query: str, params: tuple = ()) -> int: """Führt eine INSERT-Query aus und gibt die ID zurück""" with self.get_connection() as conn: cursor = conn.execute(query, params) conn.commit() return cursor.lastrowid def _execute_update(self, query: str, params: tuple = ()) -> int: """Führt eine UPDATE-Query aus und gibt affected rows zurück""" with self.get_connection() as conn: cursor = conn.execute(query, params) conn.commit() return cursor.rowcount def _execute_delete(self, query: str, params: tuple = ()) -> int: """Führt eine DELETE-Query aus und gibt affected rows zurück""" with self.get_connection() as conn: cursor = conn.execute(query, params) conn.commit() return cursor.rowcount