""" Fingerprint Repository - Persistierung von Browser Fingerprints """ import json import sqlite3 from typing import List, Optional, Dict, Any from datetime import datetime from infrastructure.repositories.base_repository import BaseRepository from domain.entities.browser_fingerprint import ( BrowserFingerprint, CanvasNoise, WebRTCConfig, HardwareConfig, NavigatorProperties, StaticComponents ) class FingerprintRepository(BaseRepository): """Repository für Browser Fingerprint Persistierung""" def save(self, fingerprint: BrowserFingerprint) -> None: """Speichert einen Fingerprint in der Datenbank""" query = """ INSERT OR REPLACE INTO browser_fingerprints ( id, canvas_noise_config, webrtc_config, fonts, hardware_config, navigator_props, webgl_vendor, webgl_renderer, audio_context_config, timezone, timezone_offset, plugins, created_at, last_rotated, platform_specific, static_components, rotation_seed, account_bound ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ audio_config = { 'base_latency': fingerprint.audio_context_base_latency, 'output_latency': fingerprint.audio_context_output_latency, 'sample_rate': fingerprint.audio_context_sample_rate } params = ( fingerprint.fingerprint_id, self._serialize_json({ 'noise_level': fingerprint.canvas_noise.noise_level, 'seed': fingerprint.canvas_noise.seed, 'algorithm': fingerprint.canvas_noise.algorithm }), self._serialize_json({ 'enabled': fingerprint.webrtc_config.enabled, 'ice_servers': fingerprint.webrtc_config.ice_servers, 'local_ip_mask': fingerprint.webrtc_config.local_ip_mask, 'disable_webrtc': fingerprint.webrtc_config.disable_webrtc }), self._serialize_json(fingerprint.font_list), self._serialize_json({ 'hardware_concurrency': fingerprint.hardware_config.hardware_concurrency, 'device_memory': fingerprint.hardware_config.device_memory, 'max_touch_points': fingerprint.hardware_config.max_touch_points, 'screen_resolution': fingerprint.hardware_config.screen_resolution, 'color_depth': fingerprint.hardware_config.color_depth, 'pixel_ratio': fingerprint.hardware_config.pixel_ratio }), self._serialize_json({ 'platform': fingerprint.navigator_props.platform, 'vendor': fingerprint.navigator_props.vendor, 'vendor_sub': fingerprint.navigator_props.vendor_sub, 'product': fingerprint.navigator_props.product, 'product_sub': fingerprint.navigator_props.product_sub, 'app_name': fingerprint.navigator_props.app_name, 'app_version': fingerprint.navigator_props.app_version, 'user_agent': fingerprint.navigator_props.user_agent, 'language': fingerprint.navigator_props.language, 'languages': fingerprint.navigator_props.languages, 'online': fingerprint.navigator_props.online, 'do_not_track': fingerprint.navigator_props.do_not_track }), fingerprint.webgl_vendor, fingerprint.webgl_renderer, self._serialize_json(audio_config), fingerprint.timezone, fingerprint.timezone_offset, self._serialize_json(fingerprint.plugins), fingerprint.created_at, fingerprint.last_rotated, self._serialize_json(fingerprint.platform_specific_config), # platform_specific self._serialize_json(fingerprint.static_components.to_dict() if fingerprint.static_components else None), fingerprint.rotation_seed, fingerprint.account_bound ) self._execute_insert(query, params) def find_by_id(self, fingerprint_id: str) -> Optional[BrowserFingerprint]: """Findet einen Fingerprint nach ID""" query = "SELECT * FROM browser_fingerprints WHERE id = ?" rows = self._execute_query(query, (fingerprint_id,)) if not rows: return None return self._row_to_fingerprint(rows[0]) def find_all(self, limit: int = 100) -> List[BrowserFingerprint]: """Holt alle Fingerprints (mit Limit)""" query = "SELECT * FROM browser_fingerprints ORDER BY created_at DESC LIMIT ?" rows = self._execute_query(query, (limit,)) return [self._row_to_fingerprint(row) for row in rows] def find_recent(self, hours: int = 24) -> List[BrowserFingerprint]: """Findet kürzlich erstellte Fingerprints""" query = """ SELECT * FROM browser_fingerprints WHERE created_at > datetime('now', '-' || ? || ' hours') ORDER BY created_at DESC """ rows = self._execute_query(query, (hours,)) return [self._row_to_fingerprint(row) for row in rows] def update_last_rotated(self, fingerprint_id: str, timestamp: datetime) -> None: """Aktualisiert den last_rotated Timestamp""" query = "UPDATE browser_fingerprints SET last_rotated = ? WHERE id = ?" self._execute_update(query, (timestamp, fingerprint_id)) def delete_older_than(self, timestamp: datetime) -> int: """Löscht Fingerprints älter als timestamp""" query = "DELETE FROM browser_fingerprints WHERE created_at < ?" return self._execute_delete(query, (timestamp,)) def get_random_fingerprints(self, count: int = 10) -> List[BrowserFingerprint]: """Holt zufällige Fingerprints für Pool""" query = """ SELECT * FROM browser_fingerprints ORDER BY RANDOM() LIMIT ? """ rows = self._execute_query(query, (count,)) return [self._row_to_fingerprint(row) for row in rows] def link_to_account(self, fingerprint_id: str, account_id: str, primary: bool = True) -> None: """Links a fingerprint to an account using simple 1:1 relationship""" query = """ UPDATE accounts SET fingerprint_id = ? WHERE id = ? """ self._execute_update(query, (fingerprint_id, account_id)) def get_primary_fingerprint_for_account(self, account_id: str) -> Optional[str]: """Gets the fingerprint ID for an account (1:1 relationship)""" query = """ SELECT fingerprint_id FROM accounts WHERE id = ? AND fingerprint_id IS NOT NULL """ rows = self._execute_query(query, (account_id,)) return dict(rows[0])['fingerprint_id'] if rows else None def get_fingerprints_for_account(self, account_id: str) -> List[BrowserFingerprint]: """Gets the fingerprint associated with an account (1:1 relationship)""" fingerprint_id = self.get_primary_fingerprint_for_account(account_id) if fingerprint_id: fingerprint = self.find_by_id(fingerprint_id) return [fingerprint] if fingerprint else [] return [] def update_fingerprint_stats(self, fingerprint_id: str, account_id: str, success: bool) -> None: """Updates fingerprint last used timestamp (simplified for 1:1)""" # Update the fingerprint's last used time query = """ UPDATE browser_fingerprints SET last_rotated = datetime('now') WHERE id = ? """ self._execute_update(query, (fingerprint_id,)) # Also update account's last login query = """ UPDATE accounts SET last_login = datetime('now') WHERE id = ? AND fingerprint_id = ? """ self._execute_update(query, (account_id, fingerprint_id)) def _row_to_fingerprint(self, row: sqlite3.Row) -> BrowserFingerprint: """Konvertiert eine Datenbankzeile zu einem Fingerprint""" # Canvas Noise canvas_config = self._deserialize_json(row['canvas_noise_config']) canvas_noise = CanvasNoise( noise_level=canvas_config.get('noise_level', 0.02), seed=canvas_config.get('seed', 42), algorithm=canvas_config.get('algorithm', 'gaussian') ) # WebRTC Config webrtc_config_data = self._deserialize_json(row['webrtc_config']) webrtc_config = WebRTCConfig( enabled=webrtc_config_data.get('enabled', True), ice_servers=webrtc_config_data.get('ice_servers', []), local_ip_mask=webrtc_config_data.get('local_ip_mask', '10.0.0.x'), disable_webrtc=webrtc_config_data.get('disable_webrtc', False) ) # Hardware Config hw_config = self._deserialize_json(row['hardware_config']) hardware_config = HardwareConfig( hardware_concurrency=hw_config.get('hardware_concurrency', 4), device_memory=hw_config.get('device_memory', 8), max_touch_points=hw_config.get('max_touch_points', 0), screen_resolution=tuple(hw_config.get('screen_resolution', [1920, 1080])), color_depth=hw_config.get('color_depth', 24), pixel_ratio=hw_config.get('pixel_ratio', 1.0) ) # Navigator Properties nav_props = self._deserialize_json(row['navigator_props']) navigator_props = NavigatorProperties( platform=nav_props.get('platform', 'Win32'), vendor=nav_props.get('vendor', 'Google Inc.'), vendor_sub=nav_props.get('vendor_sub', ''), product=nav_props.get('product', 'Gecko'), product_sub=nav_props.get('product_sub', '20030107'), app_name=nav_props.get('app_name', 'Netscape'), app_version=nav_props.get('app_version', '5.0'), user_agent=nav_props.get('user_agent', ''), language=nav_props.get('language', 'de-DE'), languages=nav_props.get('languages', ['de-DE', 'de', 'en-US', 'en']), online=nav_props.get('online', True), do_not_track=nav_props.get('do_not_track', '1') ) # Audio Context audio_config = self._deserialize_json(row['audio_context_config']) or {} # Static Components static_components = None if 'static_components' in row.keys() and row['static_components']: sc_data = self._deserialize_json(row['static_components']) if sc_data: static_components = StaticComponents( device_type=sc_data.get('device_type', 'desktop'), os_family=sc_data.get('os_family', 'windows'), browser_family=sc_data.get('browser_family', 'chromium'), gpu_vendor=sc_data.get('gpu_vendor', 'Intel Inc.'), gpu_model=sc_data.get('gpu_model', 'Intel Iris OpenGL Engine'), cpu_architecture=sc_data.get('cpu_architecture', 'x86_64'), base_fonts=sc_data.get('base_fonts', []), base_resolution=tuple(sc_data.get('base_resolution', [1920, 1080])), base_timezone=sc_data.get('base_timezone', 'Europe/Berlin') ) return BrowserFingerprint( fingerprint_id=row['id'], canvas_noise=canvas_noise, webrtc_config=webrtc_config, font_list=self._deserialize_json(row['fonts']) or [], hardware_config=hardware_config, navigator_props=navigator_props, created_at=self._parse_datetime(row['created_at']), last_rotated=self._parse_datetime(row['last_rotated']), webgl_vendor=row['webgl_vendor'], webgl_renderer=row['webgl_renderer'], audio_context_base_latency=audio_config.get('base_latency', 0.0), audio_context_output_latency=audio_config.get('output_latency', 0.0), audio_context_sample_rate=audio_config.get('sample_rate', 48000), timezone=row['timezone'], timezone_offset=row['timezone_offset'], plugins=self._deserialize_json(row['plugins']) or [], static_components=static_components, rotation_seed=row['rotation_seed'] if 'rotation_seed' in row.keys() else None, account_bound=row['account_bound'] if 'account_bound' in row.keys() else False, platform_specific_config=self._deserialize_json(row['platform_specific'] if 'platform_specific' in row.keys() else '{}') or {} )