# services/data_service.py from typing import List, Dict, Any, Optional, Tuple import pandas as pd import faiss import numpy as np import aiohttp from datetime import datetime import logging from config.config import settings from functools import lru_cache from io import StringIO # Add explicit StringIO import logger = logging.getLogger(__name__) class DataService: def __init__(self, model_service): self.embedder = model_service.embedder self.cache = {} self.last_update = None self.faiss_index = None self.data_cleaned = None async def fetch_csv_data(self) -> pd.DataFrame: """Fetch CSV data from URL with retry logic""" async with aiohttp.ClientSession() as session: for attempt in range(settings.MAX_RETRIES): try: async with session.get(settings.CSV_URL) as response: if response.status == 200: content = await response.text() return pd.read_csv(StringIO(content), sep='|') else: logger.error(f"Failed to fetch data: HTTP {response.status}") except Exception as e: logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True) if attempt == settings.MAX_RETRIES - 1: raise return pd.DataFrame() # Return empty DataFrame if all attempts fail async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]: """Prepare data and create FAISS index with caching""" try: current_time = datetime.now() # Check cache validity if (self.last_update and (current_time - self.last_update).seconds < settings.CACHE_DURATION and self.cache): return self.cache['data'], self.cache['index'] data = await self.fetch_csv_data() if data.empty: logger.error("Failed to fetch data") return pd.DataFrame(), None # Data cleaning and preparation columns_to_keep = [ 'ID', 'Name', 'Description', 'Price', 'ProductCategory', 'Grammage', 'BasePriceText', 'Rating', 'RatingCount', 'Ingredients', 'CreationDate', 'Keywords', 'Brand' ] self.data_cleaned = data[columns_to_keep].copy() # Clean description text self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace( r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True ) # Combine text fields with weights self.data_cleaned['combined_text'] = self.data_cleaned.apply( lambda row: ( f"{row['Name']} {row['Name']} " # Double weight for name f"{str(row['Description'])} " f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} " f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}" ).strip(), axis=1 ) # Create FAISS index embeddings = self.embedder.encode( self.data_cleaned['combined_text'].tolist(), convert_to_tensor=True, show_progress_bar=True ).cpu().detach().numpy() d = embeddings.shape[1] self.faiss_index = faiss.IndexFlatL2(d) self.faiss_index.add(embeddings) # Update cache self.cache = { 'data': self.data_cleaned, 'index': self.faiss_index } self.last_update = current_time return self.data_cleaned, self.faiss_index except Exception as e: logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True) return pd.DataFrame(), None async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: """Search for products similar to the query""" try: if not self.faiss_index: self.data_cleaned, self.faiss_index = await self.prepare_data_and_index() if self.faiss_index is None: return [] # Create query embedding query_embedding = self.embedder.encode([query], convert_to_tensor=True) query_embedding_np = query_embedding.cpu().detach().numpy() # Search in FAISS index distances, indices = self.faiss_index.search(query_embedding_np, top_k) # Prepare results results = [] for i, idx in enumerate(indices[0]): try: product = {} row = self.data_cleaned.iloc[idx] for column in self.data_cleaned.columns: value = row[column] # Convert numpy/pandas types to Python native types if isinstance(value, (np.integer, np.floating)): value = value.item() elif isinstance(value, pd.Timestamp): value = value.isoformat() elif isinstance(value, np.bool_): value = bool(value) product[column] = value product['score'] = float(distances[0][i]) results.append(product) except Exception as e: logger.error(f"Error processing search result {i}: {e}", exc_info=True) continue return results except Exception as e: logger.error(f"Error in search: {e}", exc_info=True) return []