llm / services /batch_processor.py
Chris4K's picture
Update services/batch_processor.py
07ac27d verified
raw
history blame
1.07 kB
# batch_processor.py
from typing import List, Dict, Any
import asyncio
#TODO explain how to use the batch processor
class BatchProcessor:
def __init__(self, max_batch_size: int = 32, max_wait_time: float = 0.1):
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.pending_requests: List[Dict] = []
self.lock = asyncio.Lock()
async def add_request(self, request: Dict) -> Any:
async with self.lock:
self.pending_requests.append(request)
if len(self.pending_requests) >= self.max_batch_size:
return await self._process_batch()
else:
await asyncio.sleep(self.max_wait_time)
if self.pending_requests:
return await self._process_batch()
async def _process_batch(self) -> List[Any]:
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
# TODO implement the batch processing logic
return batch