import os import re import ast import json import requests import pandas as pd import sqlite3 import logging from flask import ( Flask, request, jsonify, render_template, redirect, url_for, session ) from flask_session import Session from dotenv import load_dotenv # Load environment variables from a .env file load_dotenv() # Configure Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) CSV_PATH = "All_Categories.csv" # Path to your large CSV DB_PATH = "products.db" # SQLite database file TABLE_NAME = "products" # Securely load your Gemini API key from environment variables GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Ensure you set this in your .env file if not GEMINI_API_KEY: logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") # Replace with the correct model name your account has access to GEMINI_MODEL_NAME = "gemini-1.5-flash" # If invalid, try "gemini-1.5-pro" GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent" def create_db_from_csv(csv_file, db_file): if os.path.exists(db_file): logger.info(f"Database '{db_file}' already exists. Skipping creation.") return logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}") df_iter = pd.read_csv(csv_file, chunksize=50000) conn = sqlite3.connect(db_file) cur = conn.cursor() cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}") conn.commit() create_sql = f""" CREATE TABLE {TABLE_NAME} ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, image TEXT, link TEXT, ratings REAL, no_of_ratings INTEGER, discount_price TEXT, actual_price TEXT, search_terms TEXT, recommended_5 TEXT, category TEXT ); """ cur.execute(create_sql) conn.commit() # Create indexes to optimize search performance cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);") cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);") cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);") conn.commit() chunk_idx = 0 for chunk in df_iter: logger.info(f"Processing chunk {chunk_idx}...") chunk_idx += 1 # Ensure all required columns are present required_columns = [ "name","image","link","ratings","no_of_ratings", "discount_price","actual_price","search_terms","recommended_5","category" ] for col in required_columns: if col not in chunk.columns: chunk[col] = "" chunk.fillna("", inplace=True) records = chunk.to_dict(orient="records") insert_sql = f""" INSERT INTO {TABLE_NAME} (name, image, link, ratings, no_of_ratings, discount_price, actual_price, search_terms, recommended_5, category) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ data_to_insert = [] for r in records: # Clean and prepare data try: ratings = float(r["ratings"]) if r["ratings"] else 0.0 except ValueError: ratings = 0.0 try: no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0 except ValueError: no_of_ratings = 0 row_tuple = ( str(r["name"]), str(r["image"]), str(r["link"]), ratings, no_of_ratings, str(r["discount_price"]), str(r["actual_price"]), str(r["search_terms"]), str(r["recommended_5"]), str(r["category"]) ) data_to_insert.append(row_tuple) cur.executemany(insert_sql, data_to_insert) conn.commit() conn.close() logger.info("Database creation complete.") app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY") app.config["SESSION_TYPE"] = "filesystem" Session(app) @app.route("/") def index(): """Home page with a search bar.""" return render_template("index.html") @app.route("/autocomplete") def autocomplete(): """Return (id, name) JSON for substring search in 'name'.""" q = request.args.get("q", "").strip() if not q: return jsonify([]) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() sql = f""" SELECT id, name FROM {TABLE_NAME} WHERE LOWER(name) LIKE LOWER(?) LIMIT 10 """ wildcard = f"%{q}%" rows = cur.execute(sql, (wildcard,)).fetchall() conn.close() results = [{"id": r[0], "name": r[1]} for r in rows] return jsonify(results) @app.route("/product/") def show_product(item_id): """Show product detail + top-5 recommended items from recommended_5.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?" row = cur.execute(sql, (item_id,)).fetchone() if not row: conn.close() return "

Product not found

", 404 product = { "id": row[0], "name": row[1], "image": row[2], "link": row[3], "ratings": row[4], "no_of_ratings": row[5], "discount_price": row[6], "actual_price": row[7], "search_terms": row[8], "recommended_5": row[9], "category": row[10] } # Parse recommended_5 try: rec_list = ast.literal_eval(product["recommended_5"]) if not isinstance(rec_list, list): rec_list = [] except: rec_list = [] recommended_details = [] for rec_name in rec_list[:5]: sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1" rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone() if rec_row: recommended_details.append({ "id": rec_row[0], "name": rec_row[1], "image": rec_row[2], "link": rec_row[3], "discount_price": rec_row[6] }) conn.close() return render_template("product.html", product=product, recommended=recommended_details) @app.route("/rag") def rag_index(): """RAG Chat page storing conversation in session['rag_chat']. """ if "rag_chat" not in session: session["rag_chat"] = [] return render_template("rag.html", chat_history=session["rag_chat"]) @app.route("/rag/query", methods=["POST"]) def rag_query(): """ Process user input with an in-depth approach. """ if "rag_chat" not in session: session["rag_chat"] = [] user_input = request.form.get("rag_input", "").strip() if not user_input: return redirect(url_for("rag_index")) # Add user query to chat history session["rag_chat"].append(("user", user_input)) # Extract and process the query brand_keyword, product_type, price_val = extract_query_parameters(user_input) matched_items = filter_database(brand_keyword, product_type, price_val) db_context = build_db_context(matched_items, brand_keyword, product_type, price_val) conversation_text = construct_prompt(session["rag_chat"], db_context) # Get response from Gemini API gemini_response = gemini_generate_content( api_key=GEMINI_API_KEY, conversation_text=conversation_text ) # Add assistant's response to chat history session["rag_chat"].append(("assistant", gemini_response)) # Save session to persist the chat history session.modified = True # Render the chat page with updated history return render_template("rag.html", chat_history=session["rag_chat"]) def extract_query_parameters(user_query): """ Extract brand, product type, and price from the user's query dynamically. """ user_lower = user_query.lower() # Extract price price = None # Look for patterns like "under 5000", "below 25k", etc. price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower) if price_match: price_str = price_match.group(2) if price_str.lower().endswith('k'): price = int(price_str[:-1]) * 1000 else: price = int(price_str) # Dynamically extract brands and product types from the database conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Fetch distinct categories and search_terms to build dynamic keyword lists cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}") categories = [row[0].lower() for row in cur.fetchall()] cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}") search_terms = [row[0].lower() for row in cur.fetchall()] conn.close() # Initialize variables brand = None product_type = None # Check for product types in user query for category in categories: if category in user_lower: product_type = category break # If not found in category, check search_terms if not product_type: for term in search_terms: if term in user_lower: product_type = term break # For brand, attempt to extract from the search_terms by splitting possible_brands = set() for term in search_terms: words = term.split() possible_brands.update(words) possible_brands = list(possible_brands) for b in possible_brands: if b in user_lower: brand = b break return brand, product_type, price def filter_database(brand, product_type, price): """ Filter the database based on brand, product type, and price. """ conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Build dynamic SQL query sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1" params = [] if brand: sql += " AND LOWER(name) LIKE ?" params.append(f"%{brand}%") if product_type: sql += " AND LOWER(category) LIKE ?" params.append(f"%{product_type}%") if price: # Clean the discount_price field to extract numerical value # Assuming discount_price is stored as a string like "₹1,299" sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?" params.append(price) # Limit to 5000 for performance; adjust as needed sql += " LIMIT 5000" rows = cur.execute(sql, tuple(params)).fetchall() conn.close() return rows def build_db_context(matched_items, brand, product_type, price): """ Build a structured context string from matched database items. """ db_context = "" if matched_items: db_context += f"Found {len(matched_items)} items" if price: db_context += f" under ₹{price}" if brand or product_type: db_context += " matching your criteria" db_context += ":\n" # List up to 10 items for context for item in matched_items[:10]: item_name = item[1] item_price = item[2] db_context += f"- {item_name} at ₹{item_price}\n" else: db_context += "No matching items found in the database.\n" return db_context def construct_prompt(chat_history, db_context): """ Construct the prompt to send to Gemini, including conversation history and DB context. """ prompt = ( "You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n" "Conversation so far:\n" ) for speaker, message in chat_history: prompt += f"{speaker.capitalize()}: {message}\n" prompt += f"\nDatabase Context:\n{db_context}\n" prompt += "Based on the above information, provide a helpful and concise answer to the user's query." return prompt def gemini_generate_content(api_key, conversation_text): """ Call the Gemini API's generateContent endpoint with the constructed prompt. """ url = f"{GEMINI_ENDPOINT}?key={api_key}" payload = { "contents": [ { "parts": [{"text": conversation_text}] } ] } headers = {"Content-Type": "application/json"} try: resp = requests.post(url, headers=headers, data=json.dumps(payload)) except Exception as e: logger.error(f"Error during Gemini API request: {e}") return f"[Gemini Error] Failed to connect to Gemini API: {e}" try: data = resp.json() except Exception as e: logger.error(f"Invalid JSON response from Gemini API: {e}") return f"[Gemini Error] Invalid JSON response: {e}" if resp.status_code != 200: logger.error(f"Gemini API returned error {resp.status_code}: {data}") return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}" # Parse the "candidates" structure candidates = data.get("candidates", []) if not candidates: logger.error(f"No candidates received from Gemini API: {data}") return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}" first_candidate = candidates[0] content = first_candidate.get("content", {}) parts = content.get("parts", []) if not parts: logger.error(f"No 'parts' found in candidate content: {data}") return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}" assistant_reply = parts[0].get("text", "(No text found in the response)") logger.info(f"Gemini Assistant Reply: {assistant_reply}") return assistant_reply def main(): create_db_from_csv(CSV_PATH, DB_PATH) logger.info("Starting Flask server at http://127.0.0.1:5000") app.run(host="0.0.0.0", port=7860) if __name__ == "__main__": main()