359 Zeilen
12 KiB
Python
359 Zeilen
12 KiB
Python
# Database Repository for Lead Management
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from uuid import UUID
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
class LeadRepository:
|
|
def __init__(self, get_db_connection):
|
|
self.get_db_connection = get_db_connection
|
|
|
|
# Institution Methods
|
|
def get_institutions_with_counts(self) -> List[Dict[str, Any]]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT
|
|
i.id,
|
|
i.name,
|
|
i.metadata,
|
|
i.created_at,
|
|
i.updated_at,
|
|
i.created_by,
|
|
COUNT(c.id) as contact_count
|
|
FROM lead_institutions i
|
|
LEFT JOIN lead_contacts c ON c.institution_id = i.id
|
|
GROUP BY i.id
|
|
ORDER BY i.name
|
|
"""
|
|
|
|
cur.execute(query)
|
|
results = cur.fetchall()
|
|
cur.close()
|
|
|
|
return results
|
|
|
|
def create_institution(self, name: str, created_by: str) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
INSERT INTO lead_institutions (name, created_by)
|
|
VALUES (%s, %s)
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (name, created_by))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def get_institution_by_id(self, institution_id: UUID) -> Optional[Dict[str, Any]]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT * FROM lead_institutions WHERE id = %s
|
|
"""
|
|
|
|
cur.execute(query, (str(institution_id),))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def update_institution(self, institution_id: UUID, name: str) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
UPDATE lead_institutions
|
|
SET name = %s, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (name, str(institution_id)))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
# Contact Methods
|
|
def get_contacts_by_institution(self, institution_id: UUID) -> List[Dict[str, Any]]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT
|
|
c.*,
|
|
i.name as institution_name
|
|
FROM lead_contacts c
|
|
JOIN lead_institutions i ON i.id = c.institution_id
|
|
WHERE c.institution_id = %s
|
|
ORDER BY c.last_name, c.first_name
|
|
"""
|
|
|
|
cur.execute(query, (str(institution_id),))
|
|
results = cur.fetchall()
|
|
cur.close()
|
|
|
|
return results
|
|
|
|
def create_contact(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
INSERT INTO lead_contacts
|
|
(institution_id, first_name, last_name, position, extra_fields)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (
|
|
str(data['institution_id']),
|
|
data['first_name'],
|
|
data['last_name'],
|
|
data.get('position'),
|
|
psycopg2.extras.Json(data.get('extra_fields', {}))
|
|
))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def get_contact_with_details(self, contact_id: UUID) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
# Get contact base info
|
|
query = """
|
|
SELECT
|
|
c.*,
|
|
i.name as institution_name
|
|
FROM lead_contacts c
|
|
JOIN lead_institutions i ON i.id = c.institution_id
|
|
WHERE c.id = %s
|
|
"""
|
|
|
|
cur.execute(query, (str(contact_id),))
|
|
contact = cur.fetchone()
|
|
|
|
if contact:
|
|
# Get contact details (phones, emails)
|
|
details_query = """
|
|
SELECT * FROM lead_contact_details
|
|
WHERE contact_id = %s
|
|
ORDER BY detail_type, is_primary DESC, created_at
|
|
"""
|
|
cur.execute(details_query, (str(contact_id),))
|
|
contact['details'] = cur.fetchall()
|
|
|
|
# Get notes
|
|
notes_query = """
|
|
SELECT * FROM lead_notes
|
|
WHERE contact_id = %s AND is_current = true
|
|
ORDER BY created_at DESC
|
|
"""
|
|
cur.execute(notes_query, (str(contact_id),))
|
|
contact['notes'] = cur.fetchall()
|
|
|
|
cur.close()
|
|
|
|
return contact
|
|
|
|
def update_contact(self, contact_id: UUID, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
UPDATE lead_contacts
|
|
SET first_name = %s, last_name = %s, position = %s,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (
|
|
data['first_name'],
|
|
data['last_name'],
|
|
data.get('position'),
|
|
str(contact_id)
|
|
))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
# Contact Details Methods
|
|
def add_contact_detail(self, contact_id: UUID, detail_type: str,
|
|
detail_value: str, detail_label: str = None) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
INSERT INTO lead_contact_details
|
|
(contact_id, detail_type, detail_value, detail_label)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (
|
|
str(contact_id),
|
|
detail_type,
|
|
detail_value,
|
|
detail_label
|
|
))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def get_contact_detail_by_id(self, detail_id: UUID) -> Optional[Dict[str, Any]]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = "SELECT * FROM lead_contact_details WHERE id = %s"
|
|
cur.execute(query, (str(detail_id),))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def update_contact_detail(self, detail_id: UUID, detail_value: str,
|
|
detail_label: str = None) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
UPDATE lead_contact_details
|
|
SET detail_value = %s, detail_label = %s, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (detail_value, detail_label, str(detail_id)))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def delete_contact_detail(self, detail_id: UUID) -> bool:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
|
|
query = "DELETE FROM lead_contact_details WHERE id = %s"
|
|
cur.execute(query, (str(detail_id),))
|
|
|
|
deleted = cur.rowcount > 0
|
|
cur.close()
|
|
|
|
return deleted
|
|
|
|
# Notes Methods
|
|
def create_note(self, contact_id: UUID, note_text: str, created_by: str) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
INSERT INTO lead_notes
|
|
(contact_id, note_text, created_by)
|
|
VALUES (%s, %s, %s)
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(query, (
|
|
str(contact_id),
|
|
note_text,
|
|
created_by
|
|
))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def update_note(self, note_id: UUID, note_text: str, updated_by: str) -> Dict[str, Any]:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
# First, mark current version as not current
|
|
update_old = """
|
|
UPDATE lead_notes
|
|
SET is_current = false
|
|
WHERE id = %s
|
|
"""
|
|
cur.execute(update_old, (str(note_id),))
|
|
|
|
# Create new version
|
|
create_new = """
|
|
INSERT INTO lead_notes
|
|
(contact_id, note_text, created_by, parent_note_id, version)
|
|
SELECT contact_id, %s, %s, %s, version + 1
|
|
FROM lead_notes
|
|
WHERE id = %s
|
|
RETURNING *
|
|
"""
|
|
|
|
cur.execute(create_new, (
|
|
note_text,
|
|
updated_by,
|
|
str(note_id),
|
|
str(note_id)
|
|
))
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
|
|
return result
|
|
|
|
def delete_note(self, note_id: UUID) -> bool:
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
|
|
# Soft delete by marking as not current
|
|
query = """
|
|
UPDATE lead_notes
|
|
SET is_current = false
|
|
WHERE id = %s
|
|
"""
|
|
cur.execute(query, (str(note_id),))
|
|
|
|
deleted = cur.rowcount > 0
|
|
cur.close()
|
|
|
|
return deleted
|
|
|
|
def get_all_contacts_with_institutions(self) -> List[Dict[str, Any]]:
|
|
"""Get all contacts with their institution information"""
|
|
with self.get_db_connection() as conn:
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT
|
|
c.id,
|
|
c.first_name,
|
|
c.last_name,
|
|
c.position,
|
|
c.created_at,
|
|
c.updated_at,
|
|
c.institution_id,
|
|
i.name as institution_name,
|
|
(SELECT COUNT(*) FROM lead_contact_details
|
|
WHERE contact_id = c.id AND detail_type = 'phone') as phone_count,
|
|
(SELECT COUNT(*) FROM lead_contact_details
|
|
WHERE contact_id = c.id AND detail_type = 'email') as email_count,
|
|
(SELECT COUNT(*) FROM lead_notes
|
|
WHERE contact_id = c.id AND is_current = true) as note_count
|
|
FROM lead_contacts c
|
|
JOIN lead_institutions i ON i.id = c.institution_id
|
|
ORDER BY c.last_name, c.first_name
|
|
"""
|
|
|
|
cur.execute(query)
|
|
results = cur.fetchall()
|
|
cur.close()
|
|
|
|
return results |