273 Zeilen
13 KiB
Python
273 Zeilen
13 KiB
Python
"""
|
|
Fingerprint Repository - Persistierung von Browser Fingerprints
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
from infrastructure.repositories.base_repository import BaseRepository
|
|
from domain.entities.browser_fingerprint import (
|
|
BrowserFingerprint, CanvasNoise, WebRTCConfig,
|
|
HardwareConfig, NavigatorProperties, StaticComponents
|
|
)
|
|
|
|
|
|
class FingerprintRepository(BaseRepository):
|
|
"""Repository für Browser Fingerprint Persistierung"""
|
|
|
|
def save(self, fingerprint: BrowserFingerprint) -> None:
|
|
"""Speichert einen Fingerprint in der Datenbank"""
|
|
query = """
|
|
INSERT OR REPLACE INTO browser_fingerprints (
|
|
id, canvas_noise_config, webrtc_config, fonts,
|
|
hardware_config, navigator_props, webgl_vendor,
|
|
webgl_renderer, audio_context_config, timezone,
|
|
timezone_offset, plugins, created_at, last_rotated,
|
|
platform_specific, static_components, rotation_seed,
|
|
account_bound
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
audio_config = {
|
|
'base_latency': fingerprint.audio_context_base_latency,
|
|
'output_latency': fingerprint.audio_context_output_latency,
|
|
'sample_rate': fingerprint.audio_context_sample_rate
|
|
}
|
|
|
|
|
|
params = (
|
|
fingerprint.fingerprint_id,
|
|
self._serialize_json({
|
|
'noise_level': fingerprint.canvas_noise.noise_level,
|
|
'seed': fingerprint.canvas_noise.seed,
|
|
'algorithm': fingerprint.canvas_noise.algorithm
|
|
}),
|
|
self._serialize_json({
|
|
'enabled': fingerprint.webrtc_config.enabled,
|
|
'ice_servers': fingerprint.webrtc_config.ice_servers,
|
|
'local_ip_mask': fingerprint.webrtc_config.local_ip_mask,
|
|
'disable_webrtc': fingerprint.webrtc_config.disable_webrtc
|
|
}),
|
|
self._serialize_json(fingerprint.font_list),
|
|
self._serialize_json({
|
|
'hardware_concurrency': fingerprint.hardware_config.hardware_concurrency,
|
|
'device_memory': fingerprint.hardware_config.device_memory,
|
|
'max_touch_points': fingerprint.hardware_config.max_touch_points,
|
|
'screen_resolution': fingerprint.hardware_config.screen_resolution,
|
|
'color_depth': fingerprint.hardware_config.color_depth,
|
|
'pixel_ratio': fingerprint.hardware_config.pixel_ratio
|
|
}),
|
|
self._serialize_json({
|
|
'platform': fingerprint.navigator_props.platform,
|
|
'vendor': fingerprint.navigator_props.vendor,
|
|
'vendor_sub': fingerprint.navigator_props.vendor_sub,
|
|
'product': fingerprint.navigator_props.product,
|
|
'product_sub': fingerprint.navigator_props.product_sub,
|
|
'app_name': fingerprint.navigator_props.app_name,
|
|
'app_version': fingerprint.navigator_props.app_version,
|
|
'user_agent': fingerprint.navigator_props.user_agent,
|
|
'language': fingerprint.navigator_props.language,
|
|
'languages': fingerprint.navigator_props.languages,
|
|
'online': fingerprint.navigator_props.online,
|
|
'do_not_track': fingerprint.navigator_props.do_not_track
|
|
}),
|
|
fingerprint.webgl_vendor,
|
|
fingerprint.webgl_renderer,
|
|
self._serialize_json(audio_config),
|
|
fingerprint.timezone,
|
|
fingerprint.timezone_offset,
|
|
self._serialize_json(fingerprint.plugins),
|
|
fingerprint.created_at,
|
|
fingerprint.last_rotated,
|
|
self._serialize_json(fingerprint.platform_specific_config), # platform_specific
|
|
self._serialize_json(fingerprint.static_components.to_dict() if fingerprint.static_components else None),
|
|
fingerprint.rotation_seed,
|
|
fingerprint.account_bound
|
|
)
|
|
|
|
self._execute_insert(query, params)
|
|
|
|
def find_by_id(self, fingerprint_id: str) -> Optional[BrowserFingerprint]:
|
|
"""Findet einen Fingerprint nach ID"""
|
|
query = "SELECT * FROM browser_fingerprints WHERE id = ?"
|
|
rows = self._execute_query(query, (fingerprint_id,))
|
|
|
|
if not rows:
|
|
return None
|
|
|
|
return self._row_to_fingerprint(rows[0])
|
|
|
|
def find_all(self, limit: int = 100) -> List[BrowserFingerprint]:
|
|
"""Holt alle Fingerprints (mit Limit)"""
|
|
query = "SELECT * FROM browser_fingerprints ORDER BY created_at DESC LIMIT ?"
|
|
rows = self._execute_query(query, (limit,))
|
|
|
|
return [self._row_to_fingerprint(row) for row in rows]
|
|
|
|
def find_recent(self, hours: int = 24) -> List[BrowserFingerprint]:
|
|
"""Findet kürzlich erstellte Fingerprints"""
|
|
query = """
|
|
SELECT * FROM browser_fingerprints
|
|
WHERE created_at > datetime('now', '-' || ? || ' hours')
|
|
ORDER BY created_at DESC
|
|
"""
|
|
rows = self._execute_query(query, (hours,))
|
|
|
|
return [self._row_to_fingerprint(row) for row in rows]
|
|
|
|
def update_last_rotated(self, fingerprint_id: str, timestamp: datetime) -> None:
|
|
"""Aktualisiert den last_rotated Timestamp"""
|
|
query = "UPDATE browser_fingerprints SET last_rotated = ? WHERE id = ?"
|
|
self._execute_update(query, (timestamp, fingerprint_id))
|
|
|
|
def delete_older_than(self, timestamp: datetime) -> int:
|
|
"""Löscht Fingerprints älter als timestamp"""
|
|
query = "DELETE FROM browser_fingerprints WHERE created_at < ?"
|
|
return self._execute_delete(query, (timestamp,))
|
|
|
|
def get_random_fingerprints(self, count: int = 10) -> List[BrowserFingerprint]:
|
|
"""Holt zufällige Fingerprints für Pool"""
|
|
query = """
|
|
SELECT * FROM browser_fingerprints
|
|
ORDER BY RANDOM()
|
|
LIMIT ?
|
|
"""
|
|
rows = self._execute_query(query, (count,))
|
|
|
|
return [self._row_to_fingerprint(row) for row in rows]
|
|
|
|
def link_to_account(self, fingerprint_id: str, account_id: str, primary: bool = True) -> None:
|
|
"""Links a fingerprint to an account using simple 1:1 relationship"""
|
|
query = """
|
|
UPDATE accounts SET fingerprint_id = ? WHERE id = ?
|
|
"""
|
|
self._execute_update(query, (fingerprint_id, account_id))
|
|
|
|
def get_primary_fingerprint_for_account(self, account_id: str) -> Optional[str]:
|
|
"""Gets the fingerprint ID for an account (1:1 relationship)"""
|
|
query = """
|
|
SELECT fingerprint_id FROM accounts
|
|
WHERE id = ? AND fingerprint_id IS NOT NULL
|
|
"""
|
|
rows = self._execute_query(query, (account_id,))
|
|
return dict(rows[0])['fingerprint_id'] if rows else None
|
|
|
|
def get_fingerprints_for_account(self, account_id: str) -> List[BrowserFingerprint]:
|
|
"""Gets the fingerprint associated with an account (1:1 relationship)"""
|
|
fingerprint_id = self.get_primary_fingerprint_for_account(account_id)
|
|
if fingerprint_id:
|
|
fingerprint = self.find_by_id(fingerprint_id)
|
|
return [fingerprint] if fingerprint else []
|
|
return []
|
|
|
|
def update_fingerprint_stats(self, fingerprint_id: str, account_id: str,
|
|
success: bool) -> None:
|
|
"""Updates fingerprint last used timestamp (simplified for 1:1)"""
|
|
# Update the fingerprint's last used time
|
|
query = """
|
|
UPDATE browser_fingerprints
|
|
SET last_rotated = datetime('now')
|
|
WHERE id = ?
|
|
"""
|
|
self._execute_update(query, (fingerprint_id,))
|
|
|
|
# Also update account's last login
|
|
query = """
|
|
UPDATE accounts
|
|
SET last_login = datetime('now')
|
|
WHERE id = ? AND fingerprint_id = ?
|
|
"""
|
|
self._execute_update(query, (account_id, fingerprint_id))
|
|
|
|
def _row_to_fingerprint(self, row: sqlite3.Row) -> BrowserFingerprint:
|
|
"""Konvertiert eine Datenbankzeile zu einem Fingerprint"""
|
|
# Canvas Noise
|
|
canvas_config = self._deserialize_json(row['canvas_noise_config'])
|
|
canvas_noise = CanvasNoise(
|
|
noise_level=canvas_config.get('noise_level', 0.02),
|
|
seed=canvas_config.get('seed', 42),
|
|
algorithm=canvas_config.get('algorithm', 'gaussian')
|
|
)
|
|
|
|
# WebRTC Config
|
|
webrtc_config_data = self._deserialize_json(row['webrtc_config'])
|
|
webrtc_config = WebRTCConfig(
|
|
enabled=webrtc_config_data.get('enabled', True),
|
|
ice_servers=webrtc_config_data.get('ice_servers', []),
|
|
local_ip_mask=webrtc_config_data.get('local_ip_mask', '10.0.0.x'),
|
|
disable_webrtc=webrtc_config_data.get('disable_webrtc', False)
|
|
)
|
|
|
|
# Hardware Config
|
|
hw_config = self._deserialize_json(row['hardware_config'])
|
|
hardware_config = HardwareConfig(
|
|
hardware_concurrency=hw_config.get('hardware_concurrency', 4),
|
|
device_memory=hw_config.get('device_memory', 8),
|
|
max_touch_points=hw_config.get('max_touch_points', 0),
|
|
screen_resolution=tuple(hw_config.get('screen_resolution', [1920, 1080])),
|
|
color_depth=hw_config.get('color_depth', 24),
|
|
pixel_ratio=hw_config.get('pixel_ratio', 1.0)
|
|
)
|
|
|
|
# Navigator Properties
|
|
nav_props = self._deserialize_json(row['navigator_props'])
|
|
navigator_props = NavigatorProperties(
|
|
platform=nav_props.get('platform', 'Win32'),
|
|
vendor=nav_props.get('vendor', 'Google Inc.'),
|
|
vendor_sub=nav_props.get('vendor_sub', ''),
|
|
product=nav_props.get('product', 'Gecko'),
|
|
product_sub=nav_props.get('product_sub', '20030107'),
|
|
app_name=nav_props.get('app_name', 'Netscape'),
|
|
app_version=nav_props.get('app_version', '5.0'),
|
|
user_agent=nav_props.get('user_agent', ''),
|
|
language=nav_props.get('language', 'de-DE'),
|
|
languages=nav_props.get('languages', ['de-DE', 'de', 'en-US', 'en']),
|
|
online=nav_props.get('online', True),
|
|
do_not_track=nav_props.get('do_not_track', '1')
|
|
)
|
|
|
|
# Audio Context
|
|
audio_config = self._deserialize_json(row['audio_context_config']) or {}
|
|
|
|
# Static Components
|
|
static_components = None
|
|
if 'static_components' in row.keys() and row['static_components']:
|
|
sc_data = self._deserialize_json(row['static_components'])
|
|
if sc_data:
|
|
static_components = StaticComponents(
|
|
device_type=sc_data.get('device_type', 'desktop'),
|
|
os_family=sc_data.get('os_family', 'windows'),
|
|
browser_family=sc_data.get('browser_family', 'chromium'),
|
|
gpu_vendor=sc_data.get('gpu_vendor', 'Intel Inc.'),
|
|
gpu_model=sc_data.get('gpu_model', 'Intel Iris OpenGL Engine'),
|
|
cpu_architecture=sc_data.get('cpu_architecture', 'x86_64'),
|
|
base_fonts=sc_data.get('base_fonts', []),
|
|
base_resolution=tuple(sc_data.get('base_resolution', [1920, 1080])),
|
|
base_timezone=sc_data.get('base_timezone', 'Europe/Berlin')
|
|
)
|
|
|
|
|
|
return BrowserFingerprint(
|
|
fingerprint_id=row['id'],
|
|
canvas_noise=canvas_noise,
|
|
webrtc_config=webrtc_config,
|
|
font_list=self._deserialize_json(row['fonts']) or [],
|
|
hardware_config=hardware_config,
|
|
navigator_props=navigator_props,
|
|
created_at=self._parse_datetime(row['created_at']),
|
|
last_rotated=self._parse_datetime(row['last_rotated']),
|
|
webgl_vendor=row['webgl_vendor'],
|
|
webgl_renderer=row['webgl_renderer'],
|
|
audio_context_base_latency=audio_config.get('base_latency', 0.0),
|
|
audio_context_output_latency=audio_config.get('output_latency', 0.0),
|
|
audio_context_sample_rate=audio_config.get('sample_rate', 48000),
|
|
timezone=row['timezone'],
|
|
timezone_offset=row['timezone_offset'],
|
|
plugins=self._deserialize_json(row['plugins']) or [],
|
|
static_components=static_components,
|
|
rotation_seed=row['rotation_seed'] if 'rotation_seed' in row.keys() else None,
|
|
account_bound=row['account_bound'] if 'account_bound' in row.keys() else False,
|
|
platform_specific_config=self._deserialize_json(row['platform_specific'] if 'platform_specific' in row.keys() else '{}') or {}
|
|
) |