# batch_processor.py | |
from typing import List, Dict, Any | |
import asyncio | |
#TODO explain how to use the batch processor | |
class BatchProcessor: | |
def __init__(self, max_batch_size: int = 32, max_wait_time: float = 0.1): | |
self.max_batch_size = max_batch_size | |
self.max_wait_time = max_wait_time | |
self.pending_requests: List[Dict] = [] | |
self.lock = asyncio.Lock() | |
async def add_request(self, request: Dict) -> Any: | |
async with self.lock: | |
self.pending_requests.append(request) | |
if len(self.pending_requests) >= self.max_batch_size: | |
return await self._process_batch() | |
else: | |
await asyncio.sleep(self.max_wait_time) | |
if self.pending_requests: | |
return await self._process_batch() | |
async def _process_batch(self) -> List[Any]: | |
batch = self.pending_requests[:self.max_batch_size] | |
self.pending_requests = self.pending_requests[self.max_batch_size:] | |
# TODO implement the batch processing logic | |
return batch | |