Chris4K commited on
Commit
d74e0f6
·
verified ·
1 Parent(s): bf3527f

Update services/data_service.py

Browse files
Files changed (1) hide show
  1. services/data_service.py +114 -70
services/data_service.py CHANGED
@@ -1,5 +1,5 @@
1
  # services/data_service.py
2
- from typing import List, Dict, Any, Optional
3
  import pandas as pd
4
  import faiss
5
  import numpy as np
@@ -8,6 +8,7 @@ from datetime import datetime
8
  import logging
9
  from config.config import settings
10
  from functools import lru_cache
 
11
 
12
  logger = logging.getLogger(__name__)
13
 
@@ -20,85 +21,128 @@ class DataService:
20
  self.data_cleaned = None
21
 
22
  async def fetch_csv_data(self) -> pd.DataFrame:
 
23
  async with aiohttp.ClientSession() as session:
24
  for attempt in range(settings.MAX_RETRIES):
25
  try:
26
  async with session.get(settings.CSV_URL) as response:
27
  if response.status == 200:
28
  content = await response.text()
29
- return pd.read_csv(pd.StringIO(content), sep='|')
 
 
30
  except Exception as e:
31
- logger.error(f"Attempt {attempt + 1} failed: {e}")
32
  if attempt == settings.MAX_RETRIES - 1:
33
  raise
 
34
 
35
- async def prepare_data_and_index(self) -> tuple:
36
- current_time = datetime.now()
37
-
38
- # Check cache validity
39
- if (self.last_update and
40
- (current_time - self.last_update).seconds < settings.CACHE_DURATION and
41
- self.cache):
42
- return self.cache['data'], self.cache['index']
43
-
44
- data = await self.fetch_csv_data()
45
-
46
- # Data cleaning and preparation
47
- columns_to_keep = [
48
- 'ID', 'Name', 'Description', 'Price',
49
- 'ProductCategory', 'Grammage',
50
- 'BasePriceText', 'Rating', 'RatingCount',
51
- 'Ingredients', 'CreationDate', 'Keywords', 'Brand'
52
- ]
53
-
54
- self.data_cleaned = data[columns_to_keep].copy()
55
- self.data_cleaned['Description'] = self.data_cleaned['Description'].str.replace(
56
- r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True
57
- )
58
-
59
- # Improved text combination with weights
60
- self.data_cleaned['combined_text'] = self.data_cleaned.apply(
61
- lambda row: (
62
- f"{row['Name']} {row['Name']} " # Double weight for name
63
- f"{row['Description']} "
64
- f"{row['Keywords'] if pd.notnull(row['Keywords']) else ''} "
65
- f"{row['ProductCategory'] if pd.notnull(row['ProductCategory']) else ''}"
66
- ).strip(),
67
- axis=1
68
- )
69
-
70
- # Create FAISS index
71
- embeddings = self.embedder.encode(
72
- self.data_cleaned['combined_text'].tolist(),
73
- convert_to_tensor=True,
74
- show_progress_bar=True
75
- ).cpu().detach().numpy()
76
-
77
- d = embeddings.shape[1]
78
- self.faiss_index = faiss.IndexFlatL2(d)
79
- self.faiss_index.add(embeddings)
80
-
81
- # Update cache
82
- self.cache = {
83
- 'data': self.data_cleaned,
84
- 'index': self.faiss_index
85
- }
86
- self.last_update = current_time
87
-
88
- return self.data_cleaned, self.faiss_index
89
 
90
- async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
91
- if not self.faiss_index:
92
- await self.prepare_data_and_index()
93
-
94
- query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy()
95
- distances, indices = self.faiss_index.search(query_embedding, top_k)
96
-
97
- results = []
98
- for i, idx in enumerate(indices[0]):
99
- product = self.data_cleaned.iloc[idx].to_dict()
100
- product['score'] = float(distances[0][i])
101
- results.append(product)
 
 
 
 
 
 
 
102
 
103
- return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
 
 
 
 
1
  # services/data_service.py
2
+ from typing import List, Dict, Any, Optional, Tuple
3
  import pandas as pd
4
  import faiss
5
  import numpy as np
 
8
  import logging
9
  from config.config import settings
10
  from functools import lru_cache
11
+ from io import StringIO # Add explicit StringIO import
12
 
13
  logger = logging.getLogger(__name__)
14
 
 
21
  self.data_cleaned = None
22
 
23
  async def fetch_csv_data(self) -> pd.DataFrame:
24
+ """Fetch CSV data from URL with retry logic"""
25
  async with aiohttp.ClientSession() as session:
26
  for attempt in range(settings.MAX_RETRIES):
27
  try:
28
  async with session.get(settings.CSV_URL) as response:
29
  if response.status == 200:
30
  content = await response.text()
31
+ return pd.read_csv(StringIO(content), sep='|')
32
+ else:
33
+ logger.error(f"Failed to fetch data: HTTP {response.status}")
34
  except Exception as e:
35
+ logger.error(f"Attempt {attempt + 1} failed: {e}", exc_info=True)
36
  if attempt == settings.MAX_RETRIES - 1:
37
  raise
38
+ return pd.DataFrame() # Return empty DataFrame if all attempts fail
39
 
40
+ async def prepare_data_and_index(self) -> Tuple[pd.DataFrame, Any]:
41
+ """Prepare data and create FAISS index with caching"""
42
+ try:
43
+ current_time = datetime.now()
44
+
45
+ # Check cache validity
46
+ if (self.last_update and
47
+ (current_time - self.last_update).seconds < settings.CACHE_DURATION and
48
+ self.cache):
49
+ return self.cache['data'], self.cache['index']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
 
51
+ data = await self.fetch_csv_data()
52
+ if data.empty:
53
+ logger.error("Failed to fetch data")
54
+ return pd.DataFrame(), None
55
+
56
+ # Data cleaning and preparation
57
+ columns_to_keep = [
58
+ 'ID', 'Name', 'Description', 'Price',
59
+ 'ProductCategory', 'Grammage',
60
+ 'BasePriceText', 'Rating', 'RatingCount',
61
+ 'Ingredients', 'CreationDate', 'Keywords', 'Brand'
62
+ ]
63
+
64
+ self.data_cleaned = data[columns_to_keep].copy()
65
+
66
+ # Clean description text
67
+ self.data_cleaned['Description'] = self.data_cleaned['Description'].astype(str).str.replace(
68
+ r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True
69
+ )
70
 
71
+ # Combine text fields with weights
72
+ self.data_cleaned['combined_text'] = self.data_cleaned.apply(
73
+ lambda row: (
74
+ f"{row['Name']} {row['Name']} " # Double weight for name
75
+ f"{str(row['Description'])} "
76
+ f"{str(row['Keywords']) if pd.notnull(row['Keywords']) else ''} "
77
+ f"{str(row['ProductCategory']) if pd.notnull(row['ProductCategory']) else ''}"
78
+ ).strip(),
79
+ axis=1
80
+ )
81
+
82
+ # Create FAISS index
83
+ embeddings = self.embedder.encode(
84
+ self.data_cleaned['combined_text'].tolist(),
85
+ convert_to_tensor=True,
86
+ show_progress_bar=True
87
+ ).cpu().detach().numpy()
88
+
89
+ d = embeddings.shape[1]
90
+ self.faiss_index = faiss.IndexFlatL2(d)
91
+ self.faiss_index.add(embeddings)
92
+
93
+ # Update cache
94
+ self.cache = {
95
+ 'data': self.data_cleaned,
96
+ 'index': self.faiss_index
97
+ }
98
+ self.last_update = current_time
99
+
100
+ return self.data_cleaned, self.faiss_index
101
+
102
+ except Exception as e:
103
+ logger.error(f"Error in prepare_data_and_index: {e}", exc_info=True)
104
+ return pd.DataFrame(), None
105
+
106
+ async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
107
+ """Search for products similar to the query"""
108
+ try:
109
+ if not self.faiss_index:
110
+ self.data_cleaned, self.faiss_index = await self.prepare_data_and_index()
111
+ if self.faiss_index is None:
112
+ return []
113
+
114
+ # Create query embedding
115
+ query_embedding = self.embedder.encode([query], convert_to_tensor=True)
116
+ query_embedding_np = query_embedding.cpu().detach().numpy()
117
+
118
+ # Search in FAISS index
119
+ distances, indices = self.faiss_index.search(query_embedding_np, top_k)
120
+
121
+ # Prepare results
122
+ results = []
123
+ for i, idx in enumerate(indices[0]):
124
+ try:
125
+ product = {}
126
+ row = self.data_cleaned.iloc[idx]
127
+ for column in self.data_cleaned.columns:
128
+ value = row[column]
129
+ # Convert numpy/pandas types to Python native types
130
+ if isinstance(value, (np.integer, np.floating)):
131
+ value = value.item()
132
+ elif isinstance(value, pd.Timestamp):
133
+ value = value.isoformat()
134
+ elif isinstance(value, np.bool_):
135
+ value = bool(value)
136
+ product[column] = value
137
+
138
+ product['score'] = float(distances[0][i])
139
+ results.append(product)
140
+ except Exception as e:
141
+ logger.error(f"Error processing search result {i}: {e}", exc_info=True)
142
+ continue
143
+
144
+ return results
145
 
146
+ except Exception as e:
147
+ logger.error(f"Error in search: {e}", exc_info=True)
148
+ return []