| | |
| |
|
| | import time |
| | import uuid |
| | import hashlib |
| | import json |
| | from abc import ABC, abstractmethod |
| | from dataclasses import dataclass |
| | from enum import Enum |
| | from typing import Any, Dict, List, Optional |
| | import logging |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | |
| | DEFAULT_TTL_HOURS = 24 |
| | SECONDS_PER_HOUR = 3600 |
| | MAX_RETRY_ATTEMPTS = 5 |
| | BASE_RETRY_DELAY_SECONDS = 1 |
| |
|
| |
|
| | class NetworkState(Enum): |
| | ONLINE = "ONLINE" |
| | OFFLINE = "OFFLINE" |
| | HIGH_LATENCY = "HIGH_LATENCY" |
| |
|
| |
|
| | class ActionStatus(Enum): |
| | PENDING = "PENDING" |
| | SYNCED = "SYNCED" |
| | EXPIRED = "EXPIRED" |
| | FAILED = "FAILED" |
| | RETRY_SCHEDULED = "RETRY_SCHEDULED" |
| |
|
| |
|
| | class SyncResult(Enum): |
| | SUCCESS = "SUCCESS" |
| | ABORT_NO_NET = "ABORT_NO_NET" |
| | PARTIAL_SUCCESS = "PARTIAL_SUCCESS" |
| | ALL_FAILED = "ALL_FAILED" |
| |
|
| |
|
| | @dataclass |
| | class SyncSummary: |
| | result: SyncResult |
| | synced_count: int = 0 |
| | failed_count: int = 0 |
| | expired_count: int = 0 |
| | error_messages: List[str] = None |
| |
|
| | def __post_init__(self): |
| | if self.error_messages is None: |
| | self.error_messages = [] |
| |
|
| |
|
| | class LocalDatabase(ABC): |
| | @abstractmethod |
| | def insert(self, table: str, data: dict) -> None: |
| | pass |
| |
|
| | @abstractmethod |
| | def query(self, sql: str) -> List[dict]: |
| | pass |
| |
|
| | @abstractmethod |
| | def update(self, record_id: str, **kwargs) -> None: |
| | pass |
| |
|
| | @abstractmethod |
| | def get_state_snapshot(self) -> dict: |
| | """Get current local state for conflict detection.""" |
| | pass |
| |
|
| |
|
| | class ApiClient(ABC): |
| | @abstractmethod |
| | def post(self, endpoint: str, json: dict) -> dict: |
| | pass |
| |
|
| |
|
| | class SyncQueue: |
| | """Manages offline action queue with sync capabilities.""" |
| |
|
| | def __init__( |
| | self, |
| | local_db: LocalDatabase, |
| | api_client: ApiClient, |
| | ttl_hours: int = DEFAULT_TTL_HOURS |
| | ): |
| | self._db = local_db |
| | self._api_client = api_client |
| | self._ttl_hours = ttl_hours |
| | self._retry_counts: Dict[str, int] = {} |
| |
|
| | def get_local_state_hash(self) -> str: |
| | """Generate a hash of the current local state for conflict detection.""" |
| | try: |
| | state_snapshot = self._db.get_state_snapshot() |
| | state_json = json.dumps(state_snapshot, sort_keys=True) |
| | return hashlib.sha256(state_json.encode('utf-8')).hexdigest() |
| | except Exception as e: |
| | logger.exception("Failed to generate local state hash") |
| | return "" |
| |
|
| | def enqueue_action( |
| | self, |
| | action_type: str, |
| | payload: dict, |
| | priority: int = 1 |
| | ) -> str: |
| | """ |
| | Stores an action locally with a Time-To-Live (TTL). |
| | If the action acts on old data, we tag it for conflict resolution. |
| | |
| | Returns: |
| | The action ID for tracking. |
| | """ |
| | if not action_type or not isinstance(action_type, str): |
| | raise ValueError("action_type must be a non-empty string") |
| |
|
| | if not isinstance(payload, dict): |
| | raise ValueError("payload must be a dictionary") |
| |
|
| | action_id = str(uuid.uuid4()) |
| | action_item = { |
| | "id": action_id, |
| | "timestamp": time.time(), |
| | "type": action_type, |
| | "payload": self._sanitize_payload(payload), |
| | "priority": priority, |
| | "status": ActionStatus.PENDING.value, |
| | "ttl_expiry": time.time() + (self._ttl_hours * SECONDS_PER_HOUR), |
| | "device_state_hash": self.get_local_state_hash(), |
| | "retry_count": 0 |
| | } |
| |
|
| | |
| | try: |
| | self._db.insert("offline_queue", action_item) |
| | logger.info(f"Action {action_id} queued locally") |
| | return action_id |
| | except Exception as e: |
| | logger.exception(f"Failed to enqueue action: {str(e)}") |
| | raise |
| |
|
| | def _sanitize_payload(self, payload: dict) -> dict: |
| | """Remove sensitive fields before storage/transmission.""" |
| | sensitive_keys = {'password', 'token', |
| | 'secret', 'api_key', 'credential'} |
| | return { |
| | k: v for k, v in payload.items() |
| | if k.lower() not in sensitive_keys |
| | } |
| |
|
| | def attempt_sync(self, current_network_state: NetworkState) -> SyncSummary: |
| | """ |
| | Only attempts sync if network is stable. |
| | |
| | Returns: |
| | SyncSummary with results of the sync operation. |
| | """ |
| | if current_network_state == NetworkState.OFFLINE: |
| | logger.info("Sync aborted: network offline") |
| | return SyncSummary(result=SyncResult.ABORT_NO_NET) |
| |
|
| | try: |
| | pending_actions = self._db.query( |
| | "SELECT * FROM offline_queue WHERE status='PENDING' OR status='RETRY_SCHEDULED'" |
| | ) |
| | except Exception as e: |
| | logger.exception("Failed to query pending actions") |
| | return SyncSummary( |
| | result=SyncResult.ALL_FAILED, |
| | error_messages=[f"Database query failed: {str(e)}"] |
| | ) |
| |
|
| | if not pending_actions: |
| | logger.info("No pending actions to sync") |
| | return SyncSummary(result=SyncResult.SUCCESS) |
| |
|
| | |
| | pending_actions.sort( |
| | key=lambda x: (-x.get('priority', 1), x['timestamp'])) |
| |
|
| | synced_count = 0 |
| | failed_count = 0 |
| | expired_count = 0 |
| | error_messages = [] |
| |
|
| | for action in pending_actions: |
| | action_id = action['id'] |
| |
|
| | |
| | if time.time() > action['ttl_expiry']: |
| | self._db.update(action_id, status=ActionStatus.EXPIRED.value) |
| | expired_count += 1 |
| | logger.warning(f"Action {action_id} expired") |
| | continue |
| |
|
| | |
| | sync_success = self._sync_action(action) |
| | if sync_success: |
| | synced_count += 1 |
| | else: |
| | failed_count += 1 |
| | error_messages.append(f"Action {action_id} failed to sync") |
| |
|
| | |
| | if failed_count == 0: |
| | result = SyncResult.SUCCESS |
| | elif synced_count == 0: |
| | result = SyncResult.ALL_FAILED |
| | else: |
| | result = SyncResult.PARTIAL_SUCCESS |
| |
|
| | return SyncSummary( |
| | result=result, |
| | synced_count=synced_count, |
| | failed_count=failed_count, |
| | expired_count=expired_count, |
| | error_messages=error_messages |
| | ) |
| |
|
| | def _sync_action(self, action: dict) -> bool: |
| | """Attempt to sync a single action to the cloud.""" |
| | action_id = action['id'] |
| |
|
| | |
| | sync_payload = { |
| | "id": action_id, |
| | "type": action['type'], |
| | "payload": action['payload'], |
| | "timestamp": action['timestamp'], |
| | "device_state_hash": action.get('device_state_hash', '') |
| | } |
| |
|
| | try: |
| | self._api_client.post("/sync", json=sync_payload) |
| | self._db.update(action_id, status=ActionStatus.SYNCED.value) |
| | logger.info(f"Action {action_id} synced successfully") |
| | return True |
| |
|
| | except TimeoutError: |
| | self._schedule_retry(action_id) |
| | return False |
| |
|
| | except ConnectionError as e: |
| | logger.error(f"Connection error syncing {action_id}: {str(e)}") |
| | self._schedule_retry(action_id) |
| | return False |
| |
|
| | except Exception as e: |
| | logger.exception(f"Unexpected error syncing {action_id}") |
| | retry_count = self._retry_counts.get(action_id, 0) |
| | if retry_count >= MAX_RETRY_ATTEMPTS: |
| | self._db.update(action_id, status=ActionStatus.FAILED.value) |
| | logger.error( |
| | f"Action {action_id} permanently failed after {retry_count} retries") |
| | else: |
| | self._schedule_retry(action_id) |
| | return False |
| |
|
| | def _schedule_retry(self, action_id: str) -> None: |
| | """Schedule an action for retry with exponential backoff.""" |
| | current_count = self._retry_counts.get(action_id, 0) |
| |
|
| | if current_count >= MAX_RETRY_ATTEMPTS: |
| | self._db.update(action_id, status=ActionStatus.FAILED.value) |
| | logger.error( |
| | f"Action {action_id} exceeded max retries ({MAX_RETRY_ATTEMPTS})" |
| | ) |
| | return |
| |
|
| | self._retry_counts[action_id] = current_count + 1 |
| | delay = BASE_RETRY_DELAY_SECONDS * (2 ** current_count) |
| |
|
| | self._db.update( |
| | action_id, |
| | status=ActionStatus.RETRY_SCHEDULED.value, |
| | retry_count=current_count + 1 |
| | ) |
| |
|
| | logger.info( |
| | f"Action {action_id} scheduled for retry #{current_count + 1} " |
| | f"in {delay} seconds" |
| | ) |
| |
|