import psycopg2 from psycopg2.extras import Json, RealDictCursor from contextlib import contextmanager from config import DATABASE_CONFIG def get_connection(): """Create and return a new database connection""" conn = psycopg2.connect(**DATABASE_CONFIG) conn.set_client_encoding('UTF8') return conn @contextmanager def get_db_connection(): """Context manager for database connections""" conn = get_connection() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() @contextmanager def get_db_cursor(conn=None): """Context manager for database cursors""" if conn is None: with get_db_connection() as connection: cur = connection.cursor() try: yield cur finally: cur.close() else: cur = conn.cursor() try: yield cur finally: cur.close() @contextmanager def get_dict_cursor(conn=None): """Context manager for dictionary cursors""" if conn is None: with get_db_connection() as connection: cur = connection.cursor(cursor_factory=RealDictCursor) try: yield cur finally: cur.close() else: cur = conn.cursor(cursor_factory=RealDictCursor) try: yield cur finally: cur.close() def execute_query(query, params=None, fetch_one=False, fetch_all=False, as_dict=False): """Execute a query and optionally fetch results""" with get_db_connection() as conn: cursor_func = get_dict_cursor if as_dict else get_db_cursor with cursor_func(conn) as cur: cur.execute(query, params) if fetch_one: return cur.fetchone() elif fetch_all: return cur.fetchall() else: return cur.rowcount def execute_many(query, params_list): """Execute a query multiple times with different parameters""" with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.executemany(query, params_list) return cur.rowcount