santu24's picture
Update app.py
3c59d8a verified
raw
history blame
14.2 kB
import os
import re
import ast
import json
import requests
import pandas as pd
import sqlite3
import logging
from flask import (
Flask,
request,
jsonify,
render_template,
redirect,
url_for,
session
)
from flask_session import Session
from dotenv import load_dotenv
# Load environment variables from a .env file
load_dotenv()
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
CSV_PATH = "All_Categories.csv" # Path to your large CSV
DB_PATH = "products.db" # SQLite database file
TABLE_NAME = "products"
# Securely load your Gemini API key from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Ensure you set this in your .env file
if not GEMINI_API_KEY:
logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.")
raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.")
# Replace with the correct model name your account has access to
GEMINI_MODEL_NAME = "gemini-1.5-flash" # If invalid, try "gemini-1.5-pro"
GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent"
def create_db_from_csv(csv_file, db_file):
if os.path.exists(db_file):
logger.info(f"Database '{db_file}' already exists. Skipping creation.")
return
logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}")
df_iter = pd.read_csv(csv_file, chunksize=50000)
conn = sqlite3.connect(db_file)
cur = conn.cursor()
cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
conn.commit()
create_sql = f"""
CREATE TABLE {TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
image TEXT,
link TEXT,
ratings REAL,
no_of_ratings INTEGER,
discount_price TEXT,
actual_price TEXT,
search_terms TEXT,
recommended_5 TEXT,
category TEXT
);
"""
cur.execute(create_sql)
conn.commit()
# Create indexes to optimize search performance
cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);")
cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);")
cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);")
conn.commit()
chunk_idx = 0
for chunk in df_iter:
logger.info(f"Processing chunk {chunk_idx}...")
chunk_idx += 1
# Ensure all required columns are present
required_columns = [
"name","image","link","ratings","no_of_ratings",
"discount_price","actual_price","search_terms","recommended_5","category"
]
for col in required_columns:
if col not in chunk.columns:
chunk[col] = ""
chunk.fillna("", inplace=True)
records = chunk.to_dict(orient="records")
insert_sql = f"""
INSERT INTO {TABLE_NAME}
(name, image, link, ratings, no_of_ratings,
discount_price, actual_price, search_terms,
recommended_5, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
data_to_insert = []
for r in records:
# Clean and prepare data
try:
ratings = float(r["ratings"]) if r["ratings"] else 0.0
except ValueError:
ratings = 0.0
try:
no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0
except ValueError:
no_of_ratings = 0
row_tuple = (
str(r["name"]),
str(r["image"]),
str(r["link"]),
ratings,
no_of_ratings,
str(r["discount_price"]),
str(r["actual_price"]),
str(r["search_terms"]),
str(r["recommended_5"]),
str(r["category"])
)
data_to_insert.append(row_tuple)
cur.executemany(insert_sql, data_to_insert)
conn.commit()
conn.close()
logger.info("Database creation complete.")
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY")
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
@app.route("/")
def index():
"""Home page with a search bar."""
return render_template("index.html")
@app.route("/autocomplete")
def autocomplete():
"""Return (id, name) JSON for substring search in 'name'."""
q = request.args.get("q", "").strip()
if not q:
return jsonify([])
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
sql = f"""
SELECT id, name
FROM {TABLE_NAME}
WHERE LOWER(name) LIKE LOWER(?)
LIMIT 10
"""
wildcard = f"%{q}%"
rows = cur.execute(sql, (wildcard,)).fetchall()
conn.close()
results = [{"id": r[0], "name": r[1]} for r in rows]
return jsonify(results)
@app.route("/product/<int:item_id>")
def show_product(item_id):
"""Show product detail + top-5 recommended items from recommended_5."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?"
row = cur.execute(sql, (item_id,)).fetchone()
if not row:
conn.close()
return "<h2>Product not found</h2>", 404
product = {
"id": row[0],
"name": row[1],
"image": row[2],
"link": row[3],
"ratings": row[4],
"no_of_ratings": row[5],
"discount_price": row[6],
"actual_price": row[7],
"search_terms": row[8],
"recommended_5": row[9],
"category": row[10]
}
# Parse recommended_5
try:
rec_list = ast.literal_eval(product["recommended_5"])
if not isinstance(rec_list, list):
rec_list = []
except:
rec_list = []
recommended_details = []
for rec_name in rec_list[:5]:
sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1"
rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone()
if rec_row:
recommended_details.append({
"id": rec_row[0],
"name": rec_row[1],
"image": rec_row[2],
"link": rec_row[3],
"discount_price": rec_row[6]
})
conn.close()
return render_template("product.html",
product=product,
recommended=recommended_details)
@app.route("/rag")
def rag_index():
"""RAG Chat page storing conversation in session['rag_chat']. """
if "rag_chat" not in session:
session["rag_chat"] = []
return render_template("rag.html", chat_history=session["rag_chat"])
@app.route("/rag/query", methods=["POST"])
def rag_query():
"""
Process user input with an in-depth approach.
"""
if "rag_chat" not in session:
session["rag_chat"] = []
user_input = request.form.get("rag_input", "").strip()
if not user_input:
return redirect(url_for("rag_index"))
# Add user query to chat history
session["rag_chat"].append(("user", user_input))
# Extract and process the query
brand_keyword, product_type, price_val = extract_query_parameters(user_input)
matched_items = filter_database(brand_keyword, product_type, price_val)
db_context = build_db_context(matched_items, brand_keyword, product_type, price_val)
conversation_text = construct_prompt(session["rag_chat"], db_context)
# Get response from Gemini API
gemini_response = gemini_generate_content(
api_key=GEMINI_API_KEY,
conversation_text=conversation_text
)
# Add assistant's response to chat history
session["rag_chat"].append(("assistant", gemini_response))
# Save session to persist the chat history
session.modified = True
# Render the chat page with updated history
return render_template("rag.html", chat_history=session["rag_chat"])
def extract_query_parameters(user_query):
"""
Extract brand, product type, and price from the user's query dynamically.
"""
user_lower = user_query.lower()
# Extract price
price = None
# Look for patterns like "under 5000", "below 25k", etc.
price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower)
if price_match:
price_str = price_match.group(2)
if price_str.lower().endswith('k'):
price = int(price_str[:-1]) * 1000
else:
price = int(price_str)
# Dynamically extract brands and product types from the database
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Fetch distinct categories and search_terms to build dynamic keyword lists
cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}")
categories = [row[0].lower() for row in cur.fetchall()]
cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}")
search_terms = [row[0].lower() for row in cur.fetchall()]
conn.close()
# Initialize variables
brand = None
product_type = None
# Check for product types in user query
for category in categories:
if category in user_lower:
product_type = category
break
# If not found in category, check search_terms
if not product_type:
for term in search_terms:
if term in user_lower:
product_type = term
break
# For brand, attempt to extract from the search_terms by splitting
possible_brands = set()
for term in search_terms:
words = term.split()
possible_brands.update(words)
possible_brands = list(possible_brands)
for b in possible_brands:
if b in user_lower:
brand = b
break
return brand, product_type, price
def filter_database(brand, product_type, price):
"""
Filter the database based on brand, product type, and price.
"""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Build dynamic SQL query
sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1"
params = []
if brand:
sql += " AND LOWER(name) LIKE ?"
params.append(f"%{brand}%")
if product_type:
sql += " AND LOWER(category) LIKE ?"
params.append(f"%{product_type}%")
if price:
# Clean the discount_price field to extract numerical value
# Assuming discount_price is stored as a string like "₹1,299"
sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?"
params.append(price)
# Limit to 5000 for performance; adjust as needed
sql += " LIMIT 5000"
rows = cur.execute(sql, tuple(params)).fetchall()
conn.close()
return rows
def build_db_context(matched_items, brand, product_type, price):
"""
Build a structured context string from matched database items.
"""
db_context = ""
if matched_items:
db_context += f"Found {len(matched_items)} items"
if price:
db_context += f" under ₹{price}"
if brand or product_type:
db_context += " matching your criteria"
db_context += ":\n"
# List up to 10 items for context
for item in matched_items[:10]:
item_name = item[1]
item_price = item[2]
db_context += f"- {item_name} at ₹{item_price}\n"
else:
db_context += "No matching items found in the database.\n"
return db_context
def construct_prompt(chat_history, db_context):
"""
Construct the prompt to send to Gemini, including conversation history and DB context.
"""
prompt = (
"You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n"
"Conversation so far:\n"
)
for speaker, message in chat_history:
prompt += f"{speaker.capitalize()}: {message}\n"
prompt += f"\nDatabase Context:\n{db_context}\n"
prompt += "Based on the above information, provide a helpful and concise answer to the user's query."
return prompt
def gemini_generate_content(api_key, conversation_text):
"""
Call the Gemini API's generateContent endpoint with the constructed prompt.
"""
url = f"{GEMINI_ENDPOINT}?key={api_key}"
payload = {
"contents": [
{
"parts": [{"text": conversation_text}]
}
]
}
headers = {"Content-Type": "application/json"}
try:
resp = requests.post(url, headers=headers, data=json.dumps(payload))
except Exception as e:
logger.error(f"Error during Gemini API request: {e}")
return f"[Gemini Error] Failed to connect to Gemini API: {e}"
try:
data = resp.json()
except Exception as e:
logger.error(f"Invalid JSON response from Gemini API: {e}")
return f"[Gemini Error] Invalid JSON response: {e}"
if resp.status_code != 200:
logger.error(f"Gemini API returned error {resp.status_code}: {data}")
return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}"
# Parse the "candidates" structure
candidates = data.get("candidates", [])
if not candidates:
logger.error(f"No candidates received from Gemini API: {data}")
return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}"
first_candidate = candidates[0]
content = first_candidate.get("content", {})
parts = content.get("parts", [])
if not parts:
logger.error(f"No 'parts' found in candidate content: {data}")
return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}"
assistant_reply = parts[0].get("text", "(No text found in the response)")
logger.info(f"Gemini Assistant Reply: {assistant_reply}")
return assistant_reply
def main():
create_db_from_csv(CSV_PATH, DB_PATH)
logger.info("Starting Flask server at http://127.0.0.1:5000")
app.run(host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()