Files
v2-Docker/v2_adminpanel/leads/repositories.py
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

359 Zeilen
12 KiB
Python

# Database Repository for Lead Management
import psycopg2
from psycopg2.extras import RealDictCursor
from uuid import UUID
from typing import List, Optional, Dict, Any
from datetime import datetime
class LeadRepository:
def __init__(self, get_db_connection):
self.get_db_connection = get_db_connection
# Institution Methods
def get_institutions_with_counts(self) -> List[Dict[str, Any]]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT
i.id,
i.name,
i.metadata,
i.created_at,
i.updated_at,
i.created_by,
COUNT(c.id) as contact_count
FROM lead_institutions i
LEFT JOIN lead_contacts c ON c.institution_id = i.id
GROUP BY i.id
ORDER BY i.name
"""
cur.execute(query)
results = cur.fetchall()
cur.close()
return results
def create_institution(self, name: str, created_by: str) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
INSERT INTO lead_institutions (name, created_by)
VALUES (%s, %s)
RETURNING *
"""
cur.execute(query, (name, created_by))
result = cur.fetchone()
cur.close()
return result
def get_institution_by_id(self, institution_id: UUID) -> Optional[Dict[str, Any]]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT * FROM lead_institutions WHERE id = %s
"""
cur.execute(query, (str(institution_id),))
result = cur.fetchone()
cur.close()
return result
def update_institution(self, institution_id: UUID, name: str) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
UPDATE lead_institutions
SET name = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
"""
cur.execute(query, (name, str(institution_id)))
result = cur.fetchone()
cur.close()
return result
# Contact Methods
def get_contacts_by_institution(self, institution_id: UUID) -> List[Dict[str, Any]]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT
c.*,
i.name as institution_name
FROM lead_contacts c
JOIN lead_institutions i ON i.id = c.institution_id
WHERE c.institution_id = %s
ORDER BY c.last_name, c.first_name
"""
cur.execute(query, (str(institution_id),))
results = cur.fetchall()
cur.close()
return results
def create_contact(self, data: Dict[str, Any]) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
INSERT INTO lead_contacts
(institution_id, first_name, last_name, position, extra_fields)
VALUES (%s, %s, %s, %s, %s)
RETURNING *
"""
cur.execute(query, (
str(data['institution_id']),
data['first_name'],
data['last_name'],
data.get('position'),
psycopg2.extras.Json(data.get('extra_fields', {}))
))
result = cur.fetchone()
cur.close()
return result
def get_contact_with_details(self, contact_id: UUID) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
# Get contact base info
query = """
SELECT
c.*,
i.name as institution_name
FROM lead_contacts c
JOIN lead_institutions i ON i.id = c.institution_id
WHERE c.id = %s
"""
cur.execute(query, (str(contact_id),))
contact = cur.fetchone()
if contact:
# Get contact details (phones, emails)
details_query = """
SELECT * FROM lead_contact_details
WHERE contact_id = %s
ORDER BY detail_type, is_primary DESC, created_at
"""
cur.execute(details_query, (str(contact_id),))
contact['details'] = cur.fetchall()
# Get notes
notes_query = """
SELECT * FROM lead_notes
WHERE contact_id = %s AND is_current = true
ORDER BY created_at DESC
"""
cur.execute(notes_query, (str(contact_id),))
contact['notes'] = cur.fetchall()
cur.close()
return contact
def update_contact(self, contact_id: UUID, data: Dict[str, Any]) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
UPDATE lead_contacts
SET first_name = %s, last_name = %s, position = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
"""
cur.execute(query, (
data['first_name'],
data['last_name'],
data.get('position'),
str(contact_id)
))
result = cur.fetchone()
cur.close()
return result
# Contact Details Methods
def add_contact_detail(self, contact_id: UUID, detail_type: str,
detail_value: str, detail_label: str = None) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
INSERT INTO lead_contact_details
(contact_id, detail_type, detail_value, detail_label)
VALUES (%s, %s, %s, %s)
RETURNING *
"""
cur.execute(query, (
str(contact_id),
detail_type,
detail_value,
detail_label
))
result = cur.fetchone()
cur.close()
return result
def get_contact_detail_by_id(self, detail_id: UUID) -> Optional[Dict[str, Any]]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = "SELECT * FROM lead_contact_details WHERE id = %s"
cur.execute(query, (str(detail_id),))
result = cur.fetchone()
cur.close()
return result
def update_contact_detail(self, detail_id: UUID, detail_value: str,
detail_label: str = None) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
UPDATE lead_contact_details
SET detail_value = %s, detail_label = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING *
"""
cur.execute(query, (detail_value, detail_label, str(detail_id)))
result = cur.fetchone()
cur.close()
return result
def delete_contact_detail(self, detail_id: UUID) -> bool:
with self.get_db_connection() as conn:
cur = conn.cursor()
query = "DELETE FROM lead_contact_details WHERE id = %s"
cur.execute(query, (str(detail_id),))
deleted = cur.rowcount > 0
cur.close()
return deleted
# Notes Methods
def create_note(self, contact_id: UUID, note_text: str, created_by: str) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
INSERT INTO lead_notes
(contact_id, note_text, created_by)
VALUES (%s, %s, %s)
RETURNING *
"""
cur.execute(query, (
str(contact_id),
note_text,
created_by
))
result = cur.fetchone()
cur.close()
return result
def update_note(self, note_id: UUID, note_text: str, updated_by: str) -> Dict[str, Any]:
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
# First, mark current version as not current
update_old = """
UPDATE lead_notes
SET is_current = false
WHERE id = %s
"""
cur.execute(update_old, (str(note_id),))
# Create new version
create_new = """
INSERT INTO lead_notes
(contact_id, note_text, created_by, parent_note_id, version)
SELECT contact_id, %s, %s, %s, version + 1
FROM lead_notes
WHERE id = %s
RETURNING *
"""
cur.execute(create_new, (
note_text,
updated_by,
str(note_id),
str(note_id)
))
result = cur.fetchone()
cur.close()
return result
def delete_note(self, note_id: UUID) -> bool:
with self.get_db_connection() as conn:
cur = conn.cursor()
# Soft delete by marking as not current
query = """
UPDATE lead_notes
SET is_current = false
WHERE id = %s
"""
cur.execute(query, (str(note_id),))
deleted = cur.rowcount > 0
cur.close()
return deleted
def get_all_contacts_with_institutions(self) -> List[Dict[str, Any]]:
"""Get all contacts with their institution information"""
with self.get_db_connection() as conn:
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT
c.id,
c.first_name,
c.last_name,
c.position,
c.created_at,
c.updated_at,
c.institution_id,
i.name as institution_name,
(SELECT COUNT(*) FROM lead_contact_details
WHERE contact_id = c.id AND detail_type = 'phone') as phone_count,
(SELECT COUNT(*) FROM lead_contact_details
WHERE contact_id = c.id AND detail_type = 'email') as email_count,
(SELECT COUNT(*) FROM lead_notes
WHERE contact_id = c.id AND is_current = true) as note_count
FROM lead_contacts c
JOIN lead_institutions i ON i.id = c.institution_id
ORDER BY c.last_name, c.first_name
"""
cur.execute(query)
results = cur.fetchall()
cur.close()
return results