llm / services /faq_service.py
Chris4K's picture
Update services/faq_service.py
766e109 verified
raw
history blame
6.14 kB
from typing import List, Dict, Any, Optional
import aiohttp
from bs4 import BeautifulSoup
import faiss
import logging
from config.config import settings
import asyncio
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class FAQService:
def __init__(self, model_service):
self.embedder = model_service.embedder
self.faiss_index = None
self.faq_data = []
self.visited_urls = set()
self.base_url = "https://www.bofrost.de/faq/"
async def fetch_faq_pages(self) -> List[Dict[str, Any]]:
async with aiohttp.ClientSession() as session:
try:
# Start with the main FAQ page
pages = await self.crawl_faq_pages(self.base_url, session)
return [page for page in pages if page]
except Exception as e:
logger.error(f"Error fetching FAQ pages: {e}")
return []
async def crawl_faq_pages(self, url: str, session: aiohttp.ClientSession) -> List[Dict[str, Any]]:
if url in self.visited_urls or not url.startswith(self.base_url):
return []
self.visited_urls.add(url)
pages = []
try:
async with session.get(url, timeout=settings.TIMEOUT) as response:
if response.status == 200:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
# Add current page content
page_content = await self.parse_faq_content(soup, url)
if page_content:
pages.append(page_content)
# Find and follow FAQ links
tasks = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(url, href)
if (full_url.startswith(self.base_url) and
full_url not in self.visited_urls):
tasks.append(self.crawl_faq_pages(full_url, session))
if tasks:
results = await asyncio.gather(*tasks)
for result in results:
pages.extend(result)
except Exception as e:
logger.error(f"Error crawling FAQ page {url}: {e}")
return pages
async def parse_faq_content(self, soup: BeautifulSoup, url: str) -> Optional[Dict[str, Any]]:
try:
faqs = []
faq_items = soup.find_all('div', class_='faq-item')
for item in faq_items:
# Extract question
question_elem = item.find('a', class_='headline-collapse')
if not question_elem:
continue
question = question_elem.find('span')
if not question:
continue
question_text = question.text.strip()
# Extract answer
content_elem = item.find('div', class_='content-collapse')
if not content_elem:
continue
wysiwyg = content_elem.find('div', class_='wysiwyg-content')
if not wysiwyg:
continue
# Extract all text while preserving structure
answer_parts = []
for elem in wysiwyg.find_all(['p', 'li']):
text = elem.get_text(strip=True)
if text:
answer_parts.append(text)
answer_text = ' '.join(answer_parts)
if question_text and answer_text:
faqs.append({
"question": question_text,
"answer": answer_text
})
if faqs:
return {
"url": url,
"faqs": faqs
}
except Exception as e:
logger.error(f"Error parsing FAQ content from {url}: {e}")
return None
async def index_faqs(self):
faq_pages = await self.fetch_faq_pages()
self.faq_data = []
all_texts = []
for faq_page in faq_pages:
for item in faq_page['faqs']:
# Combine question and answer for better semantic search
combined_text = f"{item['question']} {item['answer']}"
all_texts.append(combined_text)
self.faq_data.append({
"question": item['question'],
"answer": item['answer'],
"source": faq_page['url']
})
if not all_texts:
logger.warning("No FAQ content found to index")
return
# Create embeddings and index them
embeddings = self.embedder.encode(all_texts, convert_to_tensor=True).cpu().detach().numpy()
dimension = embeddings.shape[1]
self.faiss_index = faiss.IndexFlatL2(dimension)
self.faiss_index.add(embeddings)
async def search_faqs(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
if not self.faiss_index:
await self.index_faqs()
if not self.faq_data:
logger.warning("No FAQ data available for search")
return []
query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy()
distances, indices = self.faiss_index.search(query_embedding, top_k)
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.faq_data):
result = self.faq_data[idx].copy()
result["score"] = float(distances[0][i])
results.append(result)
return results