import os |
import re |
import ast |
import json |
import requests |
import pandas as pd |
import sqlite3 |
import logging |
from flask import ( |
Flask, |
request, |
jsonify, |
render_template, |
redirect, |
url_for, |
session |
) |
from flask_session import Session |
from dotenv import load_dotenv |
load_dotenv() |
logging.basicConfig(level=logging.INFO) |
logger = logging.getLogger(__name__) |
CSV_PATH = "All_Categories.csv" |
DB_PATH = "products.db" |
TABLE_NAME = "products" |
if not GEMINI_API_KEY: |
logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") |
raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") |
GEMINI_MODEL_NAME = "gemini-1.5-flash" |
GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent" |
def create_db_from_csv(csv_file, db_file): |
if os.path.exists(db_file): |
logger.info(f"Database '{db_file}' already exists. Skipping creation.") |
return |
logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}") |
df_iter = pd.read_csv(csv_file, chunksize=50000) |
conn = sqlite3.connect(db_file) |
cur = conn.cursor() |
conn.commit() |
create_sql = f""" |
name TEXT, |
image TEXT, |
link TEXT, |
ratings REAL, |
no_of_ratings INTEGER, |
discount_price TEXT, |
actual_price TEXT, |
search_terms TEXT, |
recommended_5 TEXT, |
category TEXT |
); |
""" |
cur.execute(create_sql) |
conn.commit() |
cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);") |
cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);") |
cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);") |
conn.commit() |
chunk_idx = 0 |
for chunk in df_iter: |
logger.info(f"Processing chunk {chunk_idx}...") |
chunk_idx += 1 |
required_columns = [ |
"name","image","link","ratings","no_of_ratings", |
"discount_price","actual_price","search_terms","recommended_5","category" |
] |
for col in required_columns: |
if col not in chunk.columns: |
chunk[col] = "" |
chunk.fillna("", inplace=True) |
records = chunk.to_dict(orient="records") |
insert_sql = f""" |
(name, image, link, ratings, no_of_ratings, |
discount_price, actual_price, search_terms, |
recommended_5, category) |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
""" |
data_to_insert = [] |
for r in records: |
try: |
ratings = float(r["ratings"]) if r["ratings"] else 0.0 |
except ValueError: |
ratings = 0.0 |
try: |
no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0 |
except ValueError: |
no_of_ratings = 0 |
row_tuple = ( |
str(r["name"]), |
str(r["image"]), |
str(r["link"]), |
ratings, |
no_of_ratings, |
str(r["discount_price"]), |
str(r["actual_price"]), |
str(r["search_terms"]), |
str(r["recommended_5"]), |
str(r["category"]) |
) |
data_to_insert.append(row_tuple) |
cur.executemany(insert_sql, data_to_insert) |
conn.commit() |
conn.close() |
logger.info("Database creation complete.") |
app = Flask(__name__) |
app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY") |
app.config["SESSION_TYPE"] = "filesystem" |
Session(app) |
@app.route("/") |
def index(): |
"""Home page with a search bar.""" |
return render_template("index.html") |
@app.route("/autocomplete") |
def autocomplete(): |
"""Return (id, name) JSON for substring search in 'name'.""" |
q = request.args.get("q", "").strip() |
if not q: |
return jsonify([]) |
conn = sqlite3.connect(DB_PATH) |
cur = conn.cursor() |
sql = f""" |
SELECT id, name |
LIMIT 10 |
""" |
wildcard = f"%{q}%" |
rows = cur.execute(sql, (wildcard,)).fetchall() |
conn.close() |
results = [{"id": r[0], "name": r[1]} for r in rows] |
return jsonify(results) |
@app.route("/product/<int:item_id>") |
def show_product(item_id): |
"""Show product detail + top-5 recommended items from recommended_5.""" |
conn = sqlite3.connect(DB_PATH) |
cur = conn.cursor() |
sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?" |
row = cur.execute(sql, (item_id,)).fetchone() |
if not row: |
conn.close() |
return "<h2>Product not found</h2>", 404 |
product = { |
"id": row[0], |
"name": row[1], |
"image": row[2], |
"link": row[3], |
"ratings": row[4], |
"no_of_ratings": row[5], |
"discount_price": row[6], |
"actual_price": row[7], |
"search_terms": row[8], |
"recommended_5": row[9], |
"category": row[10] |
} |
try: |
rec_list = ast.literal_eval(product["recommended_5"]) |
if not isinstance(rec_list, list): |
rec_list = [] |
except: |
rec_list = [] |
recommended_details = [] |
for rec_name in rec_list[:5]: |
sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1" |
rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone() |
if rec_row: |
recommended_details.append({ |
"id": rec_row[0], |
"name": rec_row[1], |
"image": rec_row[2], |
"link": rec_row[3], |
"discount_price": rec_row[6] |
}) |
conn.close() |
return render_template("product.html", |
product=product, |
recommended=recommended_details) |
@app.route("/rag") |
def rag_index(): |
"""RAG Chat page storing conversation in session['rag_chat']. """ |
if "rag_chat" not in session: |
session["rag_chat"] = [] |
return render_template("rag.html", chat_history=session["rag_chat"]) |
@app.route("/rag/query", methods=["POST"]) |
def rag_query(): |
""" |
Process user input with an in-depth approach. |
""" |
if "rag_chat" not in session: |
session["rag_chat"] = [] |
user_input = request.form.get("rag_input", "").strip() |
if not user_input: |
return redirect(url_for("rag_index")) |
session["rag_chat"].append(("user", user_input)) |
brand_keyword, product_type, price_val = extract_query_parameters(user_input) |
matched_items = filter_database(brand_keyword, product_type, price_val) |
db_context = build_db_context(matched_items, brand_keyword, product_type, price_val) |
conversation_text = construct_prompt(session["rag_chat"], db_context) |
gemini_response = gemini_generate_content( |
api_key=GEMINI_API_KEY, |
conversation_text=conversation_text |
) |
session["rag_chat"].append(("assistant", gemini_response)) |
session.modified = True |
return render_template("rag.html", chat_history=session["rag_chat"]) |
def extract_query_parameters(user_query): |
""" |
Extract brand, product type, and price from the user's query dynamically. |
""" |
user_lower = user_query.lower() |
price = None |
price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower) |
if price_match: |
price_str = price_match.group(2) |
if price_str.lower().endswith('k'): |
price = int(price_str[:-1]) * 1000 |
else: |
price = int(price_str) |
conn = sqlite3.connect(DB_PATH) |
cur = conn.cursor() |
cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}") |
categories = [row[0].lower() for row in cur.fetchall()] |
cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}") |
search_terms = [row[0].lower() for row in cur.fetchall()] |
conn.close() |
brand = None |
product_type = None |
for category in categories: |
if category in user_lower: |
product_type = category |
break |
if not product_type: |
for term in search_terms: |
if term in user_lower: |
product_type = term |
break |
possible_brands = set() |
for term in search_terms: |
words = term.split() |
possible_brands.update(words) |
possible_brands = list(possible_brands) |
for b in possible_brands: |
if b in user_lower: |
brand = b |
break |
return brand, product_type, price |
def filter_database(brand, product_type, price): |
""" |
Filter the database based on brand, product type, and price. |
""" |
conn = sqlite3.connect(DB_PATH) |
cur = conn.cursor() |
sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1" |
params = [] |
if brand: |
sql += " AND LOWER(name) LIKE ?" |
params.append(f"%{brand}%") |
if product_type: |
sql += " AND LOWER(category) LIKE ?" |
params.append(f"%{product_type}%") |
if price: |
sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?" |
params.append(price) |
sql += " LIMIT 5000" |
rows = cur.execute(sql, tuple(params)).fetchall() |
conn.close() |
return rows |
def build_db_context(matched_items, brand, product_type, price): |
""" |
Build a structured context string from matched database items. |
""" |
db_context = "" |
if matched_items: |
db_context += f"Found {len(matched_items)} items" |
if price: |
db_context += f" under ₹{price}" |
if brand or product_type: |
db_context += " matching your criteria" |
db_context += ":\n" |
for item in matched_items[:10]: |
item_name = item[1] |
item_price = item[2] |
db_context += f"- {item_name} at ₹{item_price}\n" |
else: |
db_context += "No matching items found in the database.\n" |
return db_context |
def construct_prompt(chat_history, db_context): |
""" |
Construct the prompt to send to Gemini, including conversation history and DB context. |
""" |
prompt = ( |
"You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n" |
"Conversation so far:\n" |
) |
for speaker, message in chat_history: |
prompt += f"{speaker.capitalize()}: {message}\n" |
prompt += f"\nDatabase Context:\n{db_context}\n" |
prompt += "Based on the above information, provide a helpful and concise answer to the user's query." |
return prompt |
def gemini_generate_content(api_key, conversation_text): |
""" |
Call the Gemini API's generateContent endpoint with the constructed prompt. |
""" |
url = f"{GEMINI_ENDPOINT}?key={api_key}" |
payload = { |
"contents": [ |
{ |
"parts": [{"text": conversation_text}] |
} |
] |
} |
headers = {"Content-Type": "application/json"} |
try: |
resp = requests.post(url, headers=headers, data=json.dumps(payload)) |
except Exception as e: |
logger.error(f"Error during Gemini API request: {e}") |
return f"[Gemini Error] Failed to connect to Gemini API: {e}" |
try: |
data = resp.json() |
except Exception as e: |
logger.error(f"Invalid JSON response from Gemini API: {e}") |
return f"[Gemini Error] Invalid JSON response: {e}" |
if resp.status_code != 200: |
logger.error(f"Gemini API returned error {resp.status_code}: {data}") |
return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}" |
candidates = data.get("candidates", []) |
if not candidates: |
logger.error(f"No candidates received from Gemini API: {data}") |
return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}" |
first_candidate = candidates[0] |
content = first_candidate.get("content", {}) |
parts = content.get("parts", []) |
if not parts: |
logger.error(f"No 'parts' found in candidate content: {data}") |
return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}" |
assistant_reply = parts[0].get("text", "(No text found in the response)") |
logger.info(f"Gemini Assistant Reply: {assistant_reply}") |
return assistant_reply |
def main(): |
create_db_from_csv(CSV_PATH, DB_PATH) |
logger.info("Starting Flask server at") |
app.run(host="", port=7860) |
if __name__ == "__main__": |
main() |