|
import os |
|
import re |
|
import ast |
|
import json |
|
import requests |
|
import pandas as pd |
|
import sqlite3 |
|
import logging |
|
|
|
from flask import ( |
|
Flask, |
|
request, |
|
jsonify, |
|
render_template, |
|
redirect, |
|
url_for, |
|
session |
|
) |
|
from flask_session import Session |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
CSV_PATH = "All_Categories.csv" |
|
DB_PATH = "products.db" |
|
TABLE_NAME = "products" |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
|
if not GEMINI_API_KEY: |
|
logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") |
|
raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") |
|
|
|
|
|
GEMINI_MODEL_NAME = "gemini-1.5-flash" |
|
GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent" |
|
|
|
|
|
def create_db_from_csv(csv_file, db_file): |
|
if os.path.exists(db_file): |
|
logger.info(f"Database '{db_file}' already exists. Skipping creation.") |
|
return |
|
|
|
logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}") |
|
df_iter = pd.read_csv(csv_file, chunksize=50000) |
|
conn = sqlite3.connect(db_file) |
|
cur = conn.cursor() |
|
cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}") |
|
conn.commit() |
|
|
|
create_sql = f""" |
|
CREATE TABLE {TABLE_NAME} ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
name TEXT, |
|
image TEXT, |
|
link TEXT, |
|
ratings REAL, |
|
no_of_ratings INTEGER, |
|
discount_price TEXT, |
|
actual_price TEXT, |
|
search_terms TEXT, |
|
recommended_5 TEXT, |
|
category TEXT |
|
); |
|
""" |
|
cur.execute(create_sql) |
|
conn.commit() |
|
|
|
|
|
cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);") |
|
cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);") |
|
cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);") |
|
conn.commit() |
|
|
|
chunk_idx = 0 |
|
for chunk in df_iter: |
|
logger.info(f"Processing chunk {chunk_idx}...") |
|
chunk_idx += 1 |
|
|
|
|
|
required_columns = [ |
|
"name","image","link","ratings","no_of_ratings", |
|
"discount_price","actual_price","search_terms","recommended_5","category" |
|
] |
|
for col in required_columns: |
|
if col not in chunk.columns: |
|
chunk[col] = "" |
|
chunk.fillna("", inplace=True) |
|
records = chunk.to_dict(orient="records") |
|
|
|
insert_sql = f""" |
|
INSERT INTO {TABLE_NAME} |
|
(name, image, link, ratings, no_of_ratings, |
|
discount_price, actual_price, search_terms, |
|
recommended_5, category) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
""" |
|
data_to_insert = [] |
|
for r in records: |
|
|
|
try: |
|
ratings = float(r["ratings"]) if r["ratings"] else 0.0 |
|
except ValueError: |
|
ratings = 0.0 |
|
try: |
|
no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0 |
|
except ValueError: |
|
no_of_ratings = 0 |
|
row_tuple = ( |
|
str(r["name"]), |
|
str(r["image"]), |
|
str(r["link"]), |
|
ratings, |
|
no_of_ratings, |
|
str(r["discount_price"]), |
|
str(r["actual_price"]), |
|
str(r["search_terms"]), |
|
str(r["recommended_5"]), |
|
str(r["category"]) |
|
) |
|
data_to_insert.append(row_tuple) |
|
|
|
cur.executemany(insert_sql, data_to_insert) |
|
conn.commit() |
|
|
|
conn.close() |
|
logger.info("Database creation complete.") |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY") |
|
app.config["SESSION_TYPE"] = "filesystem" |
|
Session(app) |
|
|
|
@app.route("/") |
|
def index(): |
|
"""Home page with a search bar.""" |
|
return render_template("index.html") |
|
|
|
@app.route("/autocomplete") |
|
def autocomplete(): |
|
"""Return (id, name) JSON for substring search in 'name'.""" |
|
q = request.args.get("q", "").strip() |
|
if not q: |
|
return jsonify([]) |
|
|
|
conn = sqlite3.connect(DB_PATH) |
|
cur = conn.cursor() |
|
sql = f""" |
|
SELECT id, name |
|
FROM {TABLE_NAME} |
|
WHERE LOWER(name) LIKE LOWER(?) |
|
LIMIT 10 |
|
""" |
|
wildcard = f"%{q}%" |
|
rows = cur.execute(sql, (wildcard,)).fetchall() |
|
conn.close() |
|
|
|
results = [{"id": r[0], "name": r[1]} for r in rows] |
|
return jsonify(results) |
|
|
|
@app.route("/product/<int:item_id>") |
|
def show_product(item_id): |
|
"""Show product detail + top-5 recommended items from recommended_5.""" |
|
conn = sqlite3.connect(DB_PATH) |
|
cur = conn.cursor() |
|
|
|
sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?" |
|
row = cur.execute(sql, (item_id,)).fetchone() |
|
if not row: |
|
conn.close() |
|
return "<h2>Product not found</h2>", 404 |
|
|
|
product = { |
|
"id": row[0], |
|
"name": row[1], |
|
"image": row[2], |
|
"link": row[3], |
|
"ratings": row[4], |
|
"no_of_ratings": row[5], |
|
"discount_price": row[6], |
|
"actual_price": row[7], |
|
"search_terms": row[8], |
|
"recommended_5": row[9], |
|
"category": row[10] |
|
} |
|
|
|
|
|
try: |
|
rec_list = ast.literal_eval(product["recommended_5"]) |
|
if not isinstance(rec_list, list): |
|
rec_list = [] |
|
except: |
|
rec_list = [] |
|
|
|
recommended_details = [] |
|
for rec_name in rec_list[:5]: |
|
sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1" |
|
rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone() |
|
if rec_row: |
|
recommended_details.append({ |
|
"id": rec_row[0], |
|
"name": rec_row[1], |
|
"image": rec_row[2], |
|
"link": rec_row[3], |
|
"discount_price": rec_row[6] |
|
}) |
|
|
|
conn.close() |
|
return render_template("product.html", |
|
product=product, |
|
recommended=recommended_details) |
|
|
|
@app.route("/rag") |
|
def rag_index(): |
|
"""RAG Chat page storing conversation in session['rag_chat']. """ |
|
if "rag_chat" not in session: |
|
session["rag_chat"] = [] |
|
return render_template("rag.html", chat_history=session["rag_chat"]) |
|
|
|
@app.route("/rag/query", methods=["POST"]) |
|
def rag_query(): |
|
""" |
|
Process user input with an in-depth approach. |
|
""" |
|
if "rag_chat" not in session: |
|
session["rag_chat"] = [] |
|
|
|
user_input = request.form.get("rag_input", "").strip() |
|
if not user_input: |
|
return redirect(url_for("rag_index")) |
|
|
|
|
|
session["rag_chat"].append(("user", user_input)) |
|
|
|
|
|
brand_keyword, product_type, price_val = extract_query_parameters(user_input) |
|
matched_items = filter_database(brand_keyword, product_type, price_val) |
|
db_context = build_db_context(matched_items, brand_keyword, product_type, price_val) |
|
conversation_text = construct_prompt(session["rag_chat"], db_context) |
|
|
|
|
|
gemini_response = gemini_generate_content( |
|
api_key=GEMINI_API_KEY, |
|
conversation_text=conversation_text |
|
) |
|
|
|
|
|
session["rag_chat"].append(("assistant", gemini_response)) |
|
|
|
|
|
session.modified = True |
|
|
|
|
|
return render_template("rag.html", chat_history=session["rag_chat"]) |
|
|
|
def extract_query_parameters(user_query): |
|
""" |
|
Extract brand, product type, and price from the user's query dynamically. |
|
""" |
|
user_lower = user_query.lower() |
|
|
|
|
|
price = None |
|
|
|
price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower) |
|
if price_match: |
|
price_str = price_match.group(2) |
|
if price_str.lower().endswith('k'): |
|
price = int(price_str[:-1]) * 1000 |
|
else: |
|
price = int(price_str) |
|
|
|
|
|
conn = sqlite3.connect(DB_PATH) |
|
cur = conn.cursor() |
|
|
|
|
|
cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}") |
|
categories = [row[0].lower() for row in cur.fetchall()] |
|
|
|
cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}") |
|
search_terms = [row[0].lower() for row in cur.fetchall()] |
|
|
|
conn.close() |
|
|
|
|
|
brand = None |
|
product_type = None |
|
|
|
|
|
for category in categories: |
|
if category in user_lower: |
|
product_type = category |
|
break |
|
|
|
|
|
if not product_type: |
|
for term in search_terms: |
|
if term in user_lower: |
|
product_type = term |
|
break |
|
|
|
|
|
possible_brands = set() |
|
for term in search_terms: |
|
words = term.split() |
|
possible_brands.update(words) |
|
|
|
possible_brands = list(possible_brands) |
|
|
|
for b in possible_brands: |
|
if b in user_lower: |
|
brand = b |
|
break |
|
|
|
return brand, product_type, price |
|
|
|
def filter_database(brand, product_type, price): |
|
""" |
|
Filter the database based on brand, product type, and price. |
|
""" |
|
conn = sqlite3.connect(DB_PATH) |
|
cur = conn.cursor() |
|
|
|
|
|
sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1" |
|
params = [] |
|
|
|
if brand: |
|
sql += " AND LOWER(name) LIKE ?" |
|
params.append(f"%{brand}%") |
|
if product_type: |
|
sql += " AND LOWER(category) LIKE ?" |
|
params.append(f"%{product_type}%") |
|
if price: |
|
|
|
|
|
sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?" |
|
params.append(price) |
|
|
|
|
|
sql += " LIMIT 5000" |
|
|
|
rows = cur.execute(sql, tuple(params)).fetchall() |
|
conn.close() |
|
|
|
return rows |
|
|
|
def build_db_context(matched_items, brand, product_type, price): |
|
""" |
|
Build a structured context string from matched database items. |
|
""" |
|
db_context = "" |
|
if matched_items: |
|
db_context += f"Found {len(matched_items)} items" |
|
if price: |
|
db_context += f" under ₹{price}" |
|
if brand or product_type: |
|
db_context += " matching your criteria" |
|
db_context += ":\n" |
|
|
|
|
|
for item in matched_items[:10]: |
|
item_name = item[1] |
|
item_price = item[2] |
|
db_context += f"- {item_name} at ₹{item_price}\n" |
|
else: |
|
db_context += "No matching items found in the database.\n" |
|
|
|
return db_context |
|
|
|
def construct_prompt(chat_history, db_context): |
|
""" |
|
Construct the prompt to send to Gemini, including conversation history and DB context. |
|
""" |
|
prompt = ( |
|
"You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n" |
|
"Conversation so far:\n" |
|
) |
|
for speaker, message in chat_history: |
|
prompt += f"{speaker.capitalize()}: {message}\n" |
|
|
|
prompt += f"\nDatabase Context:\n{db_context}\n" |
|
|
|
prompt += "Based on the above information, provide a helpful and concise answer to the user's query." |
|
|
|
return prompt |
|
|
|
def gemini_generate_content(api_key, conversation_text): |
|
""" |
|
Call the Gemini API's generateContent endpoint with the constructed prompt. |
|
""" |
|
url = f"{GEMINI_ENDPOINT}?key={api_key}" |
|
|
|
payload = { |
|
"contents": [ |
|
{ |
|
"parts": [{"text": conversation_text}] |
|
} |
|
] |
|
} |
|
|
|
headers = {"Content-Type": "application/json"} |
|
try: |
|
resp = requests.post(url, headers=headers, data=json.dumps(payload)) |
|
except Exception as e: |
|
logger.error(f"Error during Gemini API request: {e}") |
|
return f"[Gemini Error] Failed to connect to Gemini API: {e}" |
|
|
|
try: |
|
data = resp.json() |
|
except Exception as e: |
|
logger.error(f"Invalid JSON response from Gemini API: {e}") |
|
return f"[Gemini Error] Invalid JSON response: {e}" |
|
|
|
if resp.status_code != 200: |
|
logger.error(f"Gemini API returned error {resp.status_code}: {data}") |
|
return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}" |
|
|
|
|
|
candidates = data.get("candidates", []) |
|
if not candidates: |
|
logger.error(f"No candidates received from Gemini API: {data}") |
|
return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}" |
|
|
|
first_candidate = candidates[0] |
|
content = first_candidate.get("content", {}) |
|
parts = content.get("parts", []) |
|
if not parts: |
|
logger.error(f"No 'parts' found in candidate content: {data}") |
|
return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}" |
|
|
|
assistant_reply = parts[0].get("text", "(No text found in the response)") |
|
logger.info(f"Gemini Assistant Reply: {assistant_reply}") |
|
return assistant_reply |
|
|
|
def main(): |
|
create_db_from_csv(CSV_PATH, DB_PATH) |
|
logger.info("Starting Flask server at http://127.0.0.1:5000") |
|
app.run(host="0.0.0.0", port=7860) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|