Files
SkillMate/backend/src/services/syncService.ts
Claude Project Manager 6b9b6d4f20 Initial commit
2025-09-20 21:31:04 +02:00

573 Zeilen
16 KiB
TypeScript

import { db } from '../config/database'
import axios from 'axios'
import crypto from 'crypto'
import { logger } from '../utils/logger'
interface SyncPayload {
type: 'employees' | 'skills' | 'users' | 'settings'
action: 'create' | 'update' | 'delete'
data: any
timestamp: string
nodeId: string
checksum: string
}
interface SyncResult {
success: boolean
syncedItems: number
conflicts: any[]
errors: any[]
}
export class SyncService {
private static instance: SyncService
private syncQueue: SyncPayload[] = []
private isSyncing = false
private constructor() {
// Initialize sync tables
this.initializeSyncTables()
}
static getInstance(): SyncService {
if (!SyncService.instance) {
SyncService.instance = new SyncService()
}
return SyncService.instance
}
private initializeSyncTables() {
// Sync log table to track all sync operations
db.exec(`
CREATE TABLE IF NOT EXISTS sync_log (
id TEXT PRIMARY KEY,
node_id TEXT NOT NULL,
sync_type TEXT NOT NULL,
sync_action TEXT NOT NULL,
entity_id TEXT NOT NULL,
entity_type TEXT NOT NULL,
payload TEXT NOT NULL,
checksum TEXT NOT NULL,
status TEXT CHECK(status IN ('pending', 'completed', 'failed', 'conflict')) NOT NULL,
error_message TEXT,
created_at TEXT NOT NULL,
completed_at TEXT
)
`)
// Conflict resolution table
db.exec(`
CREATE TABLE IF NOT EXISTS sync_conflicts (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
entity_type TEXT NOT NULL,
local_data TEXT NOT NULL,
remote_data TEXT NOT NULL,
conflict_type TEXT NOT NULL,
resolution_status TEXT CHECK(resolution_status IN ('pending', 'resolved', 'ignored')) DEFAULT 'pending',
resolved_by TEXT,
resolved_at TEXT,
created_at TEXT NOT NULL
)
`)
// Sync metadata table
db.exec(`
CREATE TABLE IF NOT EXISTS sync_metadata (
node_id TEXT PRIMARY KEY,
last_sync_at TEXT,
last_successful_sync TEXT,
sync_version INTEGER DEFAULT 1,
total_synced_items INTEGER DEFAULT 0,
total_conflicts INTEGER DEFAULT 0,
total_errors INTEGER DEFAULT 0
)
`)
}
// Generate checksum for data integrity
private generateChecksum(data: any): string {
const hash = crypto.createHash('sha256')
hash.update(JSON.stringify(data))
return hash.digest('hex')
}
// Add item to sync queue
async queueSync(type: SyncPayload['type'], action: SyncPayload['action'], data: any) {
const nodeId = this.getNodeId()
const payload: SyncPayload = {
type,
action,
data,
timestamp: new Date().toISOString(),
nodeId,
checksum: this.generateChecksum(data)
}
this.syncQueue.push(payload)
// Log to sync_log
db.prepare(`
INSERT INTO sync_log (
id, node_id, sync_type, sync_action, entity_id, entity_type,
payload, checksum, status, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
crypto.randomUUID(),
nodeId,
type,
action,
data.id || 'unknown',
type,
JSON.stringify(payload),
payload.checksum,
'pending',
payload.timestamp
)
// Trigger sync if auto-sync is enabled
const syncSettings = this.getSyncSettings()
if (syncSettings.autoSyncInterval !== 'disabled') {
this.triggerSync()
}
}
// Sync with remote nodes
async syncWithNode(targetNodeId: string): Promise<SyncResult> {
const result: SyncResult = {
success: false,
syncedItems: 0,
conflicts: [],
errors: []
}
try {
const targetNode = this.getNodeInfo(targetNodeId)
if (!targetNode) {
throw new Error('Target node not found')
}
// Get pending sync items
const pendingItems = db.prepare(`
SELECT * FROM sync_log
WHERE status = 'pending'
ORDER BY created_at ASC
`).all() as any[]
for (const item of pendingItems) {
try {
const payload = JSON.parse(item.payload)
// Send to target node
const response = await axios.post(
`http://${targetNode.ip_address}:${targetNode.port}/api/sync/receive`,
payload,
{
headers: {
'Authorization': `Bearer ${targetNode.api_key}`,
'X-Node-Id': this.getNodeId()
},
timeout: 30000
}
)
if (response.data.success) {
// Mark as completed
db.prepare(`
UPDATE sync_log
SET status = 'completed', completed_at = ?
WHERE id = ?
`).run(new Date().toISOString(), item.id)
result.syncedItems++
} else if (response.data.conflict) {
// Handle conflict
this.handleConflict(item, response.data.conflictData)
result.conflicts.push({
itemId: item.id,
conflict: response.data.conflictData
})
}
} catch (error: any) {
logger.error(`Sync error for item ${item.id}:`, error)
// Mark as failed
db.prepare(`
UPDATE sync_log
SET status = 'failed', error_message = ?
WHERE id = ?
`).run(error.message, item.id)
result.errors.push({
itemId: item.id,
error: error.message
})
}
}
// Update sync metadata
this.updateSyncMetadata(targetNodeId, result)
result.success = result.errors.length === 0
} catch (error: any) {
logger.error('Sync failed:', error)
result.errors.push({ error: error.message })
}
return result
}
// Receive sync from remote node
async receiveSync(payload: SyncPayload): Promise<any> {
try {
// Verify checksum
const calculatedChecksum = this.generateChecksum(payload.data)
if (calculatedChecksum !== payload.checksum) {
throw new Error('Checksum mismatch - data integrity error')
}
// Check for conflicts
const conflict = await this.checkForConflicts(payload)
if (conflict) {
return {
success: false,
conflict: true,
conflictData: conflict
}
}
// Apply changes based on type and action
await this.applyChanges(payload)
return { success: true }
} catch (error: any) {
logger.error('Error receiving sync:', error)
return {
success: false,
error: error.message
}
}
}
// Check for conflicts
private async checkForConflicts(payload: SyncPayload): Promise<any> {
const { type, action, data } = payload
if (action === 'create') {
// Check if entity already exists
let exists = false
switch (type) {
case 'employees':
exists = !!db.prepare('SELECT id FROM employees WHERE id = ?').get(data.id)
break
case 'skills':
exists = !!db.prepare('SELECT id FROM skills WHERE id = ?').get(data.id)
break
case 'users':
exists = !!db.prepare('SELECT id FROM users WHERE id = ?').get(data.id)
break
}
if (exists) {
return {
type: 'already_exists',
entityId: data.id,
entityType: type
}
}
} else if (action === 'update') {
// Check if local version is newer
let localEntity: any = null
switch (type) {
case 'employees':
localEntity = db.prepare('SELECT * FROM employees WHERE id = ?').get(data.id)
break
case 'skills':
localEntity = db.prepare('SELECT * FROM skills WHERE id = ?').get(data.id)
break
case 'users':
localEntity = db.prepare('SELECT * FROM users WHERE id = ?').get(data.id)
break
}
if (localEntity && new Date(localEntity.updated_at) > new Date(data.updatedAt)) {
return {
type: 'newer_version',
entityId: data.id,
entityType: type,
localVersion: localEntity,
remoteVersion: data
}
}
}
return null
}
// Apply changes to local database
private async applyChanges(payload: SyncPayload) {
const { type, action, data } = payload
switch (type) {
case 'employees':
await this.syncEmployee(action, data)
break
case 'skills':
await this.syncSkill(action, data)
break
case 'users':
await this.syncUser(action, data)
break
case 'settings':
await this.syncSettings(action, data)
break
}
}
// Sync employee data
private async syncEmployee(action: string, data: any) {
switch (action) {
case 'create':
db.prepare(`
INSERT INTO employees (
id, first_name, last_name, employee_number, photo, position,
department, email, phone, mobile, office, availability,
clearance_level, clearance_valid_until, clearance_issued_date,
created_at, updated_at, created_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
data.id, data.firstName, data.lastName, data.employeeNumber,
data.photo, data.position, data.department, data.email,
data.phone, data.mobile, data.office, data.availability,
data.clearance?.level, data.clearance?.validUntil,
data.clearance?.issuedDate, data.createdAt, data.updatedAt,
data.createdBy
)
// Sync skills
if (data.skills) {
for (const skill of data.skills) {
db.prepare(`
INSERT INTO employee_skills (employee_id, skill_id, level, verified, verified_by, verified_date)
VALUES (?, ?, ?, ?, ?, ?)
`).run(
data.id, skill.id, skill.level,
skill.verified ? 1 : 0, skill.verifiedBy, skill.verifiedDate
)
}
}
break
case 'update':
db.prepare(`
UPDATE employees SET
first_name = ?, last_name = ?, position = ?, department = ?,
email = ?, phone = ?, mobile = ?, office = ?, availability = ?,
updated_at = ?, updated_by = ?
WHERE id = ?
`).run(
data.firstName, data.lastName, data.position, data.department,
data.email, data.phone, data.mobile, data.office, data.availability,
data.updatedAt, data.updatedBy, data.id
)
break
case 'delete':
db.prepare('DELETE FROM employees WHERE id = ?').run(data.id)
db.prepare('DELETE FROM employee_skills WHERE employee_id = ?').run(data.id)
db.prepare('DELETE FROM language_skills WHERE employee_id = ?').run(data.id)
db.prepare('DELETE FROM specializations WHERE employee_id = ?').run(data.id)
break
}
}
// Sync skill data
private async syncSkill(action: string, data: any) {
switch (action) {
case 'create':
db.prepare(`
INSERT INTO skills (id, name, category, description, requires_certification, expires_after)
VALUES (?, ?, ?, ?, ?, ?)
`).run(
data.id, data.name, data.category, data.description,
data.requiresCertification ? 1 : 0, data.expiresAfter
)
break
case 'update':
db.prepare(`
UPDATE skills SET name = ?, category = ?, description = ?
WHERE id = ?
`).run(data.name, data.category, data.description, data.id)
break
case 'delete':
db.prepare('DELETE FROM skills WHERE id = ?').run(data.id)
break
}
}
// Sync user data
private async syncUser(action: string, data: any) {
// Implementation for user sync
logger.info(`Syncing user: ${action}`, data)
}
// Sync settings
private async syncSettings(action: string, data: any) {
// Implementation for settings sync
logger.info(`Syncing settings: ${action}`, data)
}
// Handle conflicts
private handleConflict(syncItem: any, conflictData: any) {
const conflictId = crypto.randomUUID()
db.prepare(`
INSERT INTO sync_conflicts (
id, entity_id, entity_type, local_data, remote_data,
conflict_type, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
`).run(
conflictId,
conflictData.entityId,
conflictData.entityType,
JSON.stringify(conflictData.localVersion || {}),
JSON.stringify(conflictData.remoteVersion || {}),
conflictData.type,
new Date().toISOString()
)
// Apply conflict resolution strategy
const settings = this.getSyncSettings()
if (settings.conflictResolution === 'admin') {
// Admin wins - apply remote changes if from admin node
const remoteNode = this.getNodeInfo(syncItem.node_id)
if (remoteNode?.type === 'admin') {
this.applyChanges(JSON.parse(syncItem.payload))
}
} else if (settings.conflictResolution === 'newest') {
// Newest wins - compare timestamps
const localTime = new Date(conflictData.localVersion?.updatedAt || 0)
const remoteTime = new Date(conflictData.remoteVersion?.updatedAt || 0)
if (remoteTime > localTime) {
this.applyChanges(JSON.parse(syncItem.payload))
}
}
// Manual resolution - do nothing, let admin resolve
}
// Get sync settings
private getSyncSettings(): any {
const settings = db.prepare('SELECT * FROM sync_settings WHERE id = ?').get('default') as any
return settings || {
autoSyncInterval: 'disabled',
conflictResolution: 'admin'
}
}
// Get node information
private getNodeInfo(nodeId: string): any {
return db.prepare('SELECT * FROM network_nodes WHERE id = ?').get(nodeId)
}
// Get current node ID
private getNodeId(): string {
// Get from environment or generate
return process.env.NODE_ID || 'local-node'
}
// Update sync metadata
private updateSyncMetadata(nodeId: string, result: SyncResult) {
const existing = db.prepare('SELECT * FROM sync_metadata WHERE node_id = ?').get(nodeId) as any
if (existing) {
db.prepare(`
UPDATE sync_metadata SET
last_sync_at = ?,
last_successful_sync = ?,
total_synced_items = total_synced_items + ?,
total_conflicts = total_conflicts + ?,
total_errors = total_errors + ?
WHERE node_id = ?
`).run(
new Date().toISOString(),
result.success ? new Date().toISOString() : existing.last_successful_sync,
result.syncedItems,
result.conflicts.length,
result.errors.length,
nodeId
)
} else {
db.prepare(`
INSERT INTO sync_metadata (
node_id, last_sync_at, last_successful_sync,
total_synced_items, total_conflicts, total_errors
) VALUES (?, ?, ?, ?, ?, ?)
`).run(
nodeId,
new Date().toISOString(),
result.success ? new Date().toISOString() : null,
result.syncedItems,
result.conflicts.length,
result.errors.length
)
}
}
// Trigger sync process
async triggerSync() {
if (this.isSyncing) {
logger.info('Sync already in progress')
return
}
this.isSyncing = true
try {
// Get all active nodes
const nodes = db.prepare(`
SELECT * FROM network_nodes
WHERE is_online = 1 AND id != ?
`).all(this.getNodeId()) as any[]
for (const node of nodes) {
logger.info(`Syncing with node: ${node.name}`)
await this.syncWithNode(node.id)
}
} catch (error) {
logger.error('Sync trigger failed:', error)
} finally {
this.isSyncing = false
}
}
// Get sync status
getSyncStatus(): any {
const pendingCount = db.prepare(
'SELECT COUNT(*) as count FROM sync_log WHERE status = ?'
).get('pending') as any
const recentSync = db.prepare(`
SELECT * FROM sync_log
ORDER BY created_at DESC
LIMIT 10
`).all()
const conflicts = db.prepare(`
SELECT COUNT(*) as count FROM sync_conflicts
WHERE resolution_status = ?
`).get('pending') as any
return {
pendingItems: pendingCount.count,
recentSync,
pendingConflicts: conflicts.count,
isSyncing: this.isSyncing
}
}
}
export const syncService = SyncService.getInstance()