| """ |
| Parallel processing utilities for data pipeline optimization. |
| |
| Supports: |
| - Parallel data ingestion |
| - Batch processing with multiprocessing |
| - Distributed processing support (foundation for Dask/Beam) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import multiprocessing as mp |
| from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed |
| from functools import partial |
| from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar |
|
|
| T = TypeVar("T") |
| R = TypeVar("R") |
|
|
|
|
| def parallel_map( |
| func: Callable[[T], R], |
| items: Iterable[T], |
| max_workers: Optional[int] = None, |
| use_threads: bool = False, |
| chunk_size: int = 1, |
| **func_kwargs, |
| ) -> List[R]: |
| """ |
| Parallel map function. |
| |
| Args: |
| func: Function to apply to each item |
| items: Iterable of items to process |
| max_workers: Maximum number of workers (default: CPU count) |
| use_threads: Use threads instead of processes (for I/O-bound tasks) |
| chunk_size: Number of items per chunk (for ProcessPoolExecutor) |
| **func_kwargs: Additional kwargs to pass to func |
| |
| Returns: |
| List of results in the same order as items |
| """ |
| if max_workers is None: |
| max_workers = mp.cpu_count() |
| |
| items_list = list(items) |
| if not items_list: |
| return [] |
| |
| |
| if func_kwargs: |
| func = partial(func, **func_kwargs) |
| |
| executor_class = ThreadPoolExecutor if use_threads else ProcessPoolExecutor |
| |
| with executor_class(max_workers=max_workers) as executor: |
| if use_threads: |
| |
| futures = [executor.submit(func, item) for item in items_list] |
| else: |
| |
| futures = executor.map(func, items_list, chunksize=chunk_size) |
| return list(futures) |
| |
| |
| results = [] |
| for future in as_completed(futures): |
| try: |
| results.append(future.result()) |
| except Exception as e: |
| |
| print(f"Error in parallel_map: {e}") |
| results.append(None) |
| |
| |
| |
| return results |
|
|
|
|
| def batch_process( |
| func: Callable[[List[T]], List[R]], |
| items: Iterable[T], |
| batch_size: int = 32, |
| max_workers: Optional[int] = None, |
| **func_kwargs, |
| ) -> List[R]: |
| """ |
| Process items in batches in parallel. |
| |
| Args: |
| func: Function that processes a batch and returns list of results |
| items: Iterable of items to process |
| batch_size: Number of items per batch |
| max_workers: Maximum number of parallel batches |
| **func_kwargs: Additional kwargs to pass to func |
| |
| Returns: |
| Flattened list of results |
| """ |
| items_list = list(items) |
| batches = [ |
| items_list[i : i + batch_size] for i in range(0, len(items_list), batch_size) |
| ] |
| |
| if not batches: |
| return [] |
| |
| |
| if func_kwargs: |
| func = partial(func, **func_kwargs) |
| |
| if max_workers is None: |
| max_workers = min(len(batches), mp.cpu_count()) |
| |
| results = parallel_map( |
| func, |
| batches, |
| max_workers=max_workers, |
| use_threads=False, |
| ) |
| |
| |
| flattened = [] |
| for batch_results in results: |
| if batch_results: |
| flattened.extend(batch_results) |
| |
| return flattened |
|
|
|
|
| class ParallelProcessor: |
| """Wrapper for parallel processing with configuration.""" |
|
|
| def __init__( |
| self, |
| max_workers: Optional[int] = None, |
| use_threads: bool = False, |
| chunk_size: int = 1, |
| ): |
| self.max_workers = max_workers or mp.cpu_count() |
| self.use_threads = use_threads |
| self.chunk_size = chunk_size |
|
|
| def map(self, func: Callable[[T], R], items: Iterable[T], **func_kwargs) -> List[R]: |
| """Apply function to items in parallel.""" |
| return parallel_map( |
| func, |
| items, |
| max_workers=self.max_workers, |
| use_threads=self.use_threads, |
| chunk_size=self.chunk_size, |
| **func_kwargs, |
| ) |
|
|
| def batch_map( |
| self, |
| func: Callable[[List[T]], List[R]], |
| items: Iterable[T], |
| batch_size: int = 32, |
| **func_kwargs, |
| ) -> List[R]: |
| """Process items in batches in parallel.""" |
| return batch_process( |
| func, |
| items, |
| batch_size=batch_size, |
| max_workers=self.max_workers, |
| **func_kwargs, |
| ) |
|
|