114 Zeilen
3.1 KiB
Python
114 Zeilen
3.1 KiB
Python
"""
|
|
Helper functions for managing partitioned tables
|
|
"""
|
|
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def ensure_partition_exists(conn, table_name, timestamp):
|
|
"""
|
|
Ensure a partition exists for the given timestamp.
|
|
Creates the partition if it doesn't exist.
|
|
|
|
Args:
|
|
conn: Database connection
|
|
table_name: Base table name (e.g., 'license_heartbeats')
|
|
timestamp: Timestamp to check partition for
|
|
|
|
Returns:
|
|
bool: True if partition exists or was created, False on error
|
|
"""
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Get year and month from timestamp
|
|
if isinstance(timestamp, str):
|
|
timestamp = datetime.fromisoformat(timestamp)
|
|
|
|
year = timestamp.year
|
|
month = timestamp.month
|
|
|
|
# Partition name
|
|
partition_name = f"{table_name}_{year}_{month:02d}"
|
|
|
|
# Check if partition exists
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM pg_tables
|
|
WHERE tablename = %s
|
|
)
|
|
""", (partition_name,))
|
|
|
|
if cursor.fetchone()[0]:
|
|
return True
|
|
|
|
# Create partition
|
|
start_date = f"{year}-{month:02d}-01"
|
|
if month == 12:
|
|
end_date = f"{year + 1}-01-01"
|
|
else:
|
|
end_date = f"{year}-{month + 1:02d}-01"
|
|
|
|
cursor.execute(f"""
|
|
CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {table_name}
|
|
FOR VALUES FROM ('{start_date}') TO ('{end_date}')
|
|
""")
|
|
|
|
conn.commit()
|
|
logger.info(f"Created partition {partition_name}")
|
|
|
|
cursor.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ensuring partition exists: {e}")
|
|
return False
|
|
|
|
def create_future_partitions(conn, table_name, months_ahead=6):
|
|
"""
|
|
Create partitions for the next N months
|
|
|
|
Args:
|
|
conn: Database connection
|
|
table_name: Base table name
|
|
months_ahead: Number of months to create partitions for
|
|
"""
|
|
current_date = datetime.now()
|
|
|
|
for i in range(months_ahead + 1):
|
|
target_date = current_date + relativedelta(months=i)
|
|
ensure_partition_exists(conn, table_name, target_date)
|
|
|
|
def check_table_exists(conn, table_name):
|
|
"""
|
|
Check if a table exists in the database
|
|
|
|
Args:
|
|
conn: Database connection
|
|
table_name: Table name to check
|
|
|
|
Returns:
|
|
bool: True if table exists, False otherwise
|
|
"""
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM information_schema.tables
|
|
WHERE table_name = %s
|
|
)
|
|
""", (table_name,))
|
|
|
|
exists = cursor.fetchone()[0]
|
|
cursor.close()
|
|
return exists
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking if table exists: {e}")
|
|
return False |