import os import sys import requests import yfinance as yf import matplotlib.pyplot as plt import pandas as pd from transformers import pipeline from dotenv import load_dotenv import gradio as gr from smolagents import CodeAgent, tool from smolagents.models import LiteLLMModel from typing import List, Dict, Any, Optional, Union import time import re from bs4 import BeautifulSoup # Load environment variables load_dotenv(override=True) # Initialize sentiment analysis pipeline sentiment_analyzer = pipeline( "sentiment-analysis", model="distilbert/distilbert-base-uncased-finetuned-sst-2-english" ) # Groq API configuration GROQ_API_KEY = os.getenv("GROQ_API_KEY") GROQ_MODEL = "llama3-70b-8192" # Define a class for smolagents tools class SmolaTools: @staticmethod @tool def detect_ticker_from_query(query: str) -> Dict[str, Any]: """ Extract company name from query and find its stock ticker. Args: query: The search query to analyze Returns: Dictionary with detected ticker and confidence """ try: # Step 1: Extract the company name using Groq API headers = { "Content-Type": "application/json", "Authorization": f"Bearer {GROQ_API_KEY}" } # Extract company name from query name_data = { "model": GROQ_MODEL, "messages": [ {"role": "system", "content": "You are a financial assistant. Extract only the company or stock name from the query. Respond with ONLY the company name, nothing else."}, {"role": "user", "content": f"Extract the company name from this query: {query}"} ], "temperature": 0.1, "max_tokens": 50 } name_response = requests.post( "https://api.groq.com/openai/v1/chat/completions", headers=headers, json=name_data ) if name_response.status_code == 200: company_name = name_response.json()["choices"][0]["message"]["content"].strip() print(f"Extracted company name: {company_name}") if not company_name or company_name.lower() == "unknown": return {"ticker": "", "confidence": "none", "method": "none"} # Step 2: Find ticker for this company using Google search API_KEY = os.getenv("API_KEY") CSE_ID = os.getenv("CSE_ID") ticker_search_query = f"{company_name} stock ticker symbol NSE BSE" search_url = f"https://www.googleapis.com/customsearch/v1?q={ticker_search_query}&cx={CSE_ID}&key={API_KEY}&num=3" search_response = requests.get(search_url) if search_response.status_code == 200: search_data = search_response.json() # Combine titles and snippets texts = [] for item in search_data.get("items", []): texts.append(item.get("title", "")) texts.append(item.get("snippet", "")) combined_text = " ".join(texts) print(f"Search results obtained for ticker lookup") # Step 3: Extract the ticker from search results using Groq ticker_data = { "model": GROQ_MODEL, "messages": [ {"role": "system", "content": "You are a financial expert. Extract ONLY the stock ticker symbol for the company from the text. Return ONLY the ticker symbol (e.g., 'RELIANCE' for Reliance Industries). If uncertain, respond with UNKNOWN."}, {"role": "user", "content": f"Extract the stock ticker symbol for {company_name} from this text: {combined_text}"} ], "temperature": 0.1, "max_tokens": 10 } ticker_response = requests.post( "https://api.groq.com/openai/v1/chat/completions", headers=headers, json=ticker_data ) if ticker_response.status_code == 200: ticker = ticker_response.json()["choices"][0]["message"]["content"].strip() print(f"Extracted ticker: {ticker}") if ticker and ticker != "UNKNOWN": return {"ticker": ticker, "confidence": "medium", "method": "company_search"} except Exception as e: print(f"Error in company/ticker detection: {str(e)}") # Default return if no ticker is found return {"ticker": "", "confidence": "none", "method": "none"} @staticmethod @tool def fetch_stock_news(query: str, num_results: int = 5) -> List[Dict[str, str]]: """Fetch stock market news using Google Custom Search API. Args: query: The search term for stock-related news. num_results: The number of news articles to fetch (default: 10). Returns: A list of dictionaries containing news title, link, and snippet. """ API_KEY = os.getenv("API_KEY") CSE_ID = os.getenv("CSE_ID") # Extract the stock name/ticker from the query stock_terms = query.split() stock_name = stock_terms[-1] if len(stock_terms) > 0 else "" # Expand query to focus on financial news sources and latest updates expanded_query = f"{query}" # Add exclusions to avoid just company websites site_exclusions = () # Focus on financial news sites (expanded list) news_sites = ( "site:economictimes.indiatimes.com OR " "site:moneycontrol.com OR " "site:livemint.com OR " "site:ndtv.com/business OR " "site:cnbctv18.com OR " "site:business-standard.com OR " "site:financialexpress.com OR " "site:reuters.com OR " "site:bloomberg.com OR " "site:investing.com OR " "site:marketwatch.com OR " "site:thehindu.com/business OR " "site:business-today.in OR " "site:fool.com OR " "site:zeebiz.com OR " "site:screener.in" ) # Combine into final search query final_query = f"{expanded_query} {site_exclusions} ({news_sites})" url = f"https://www.googleapis.com/customsearch/v1?q={final_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}&sort=date" print(f"Searching with query: {final_query}") response = requests.get(url) if response.status_code == 200: data = response.json() results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} for item in data.get("items", [])] if not results: # Fallback to a more general search if financial sites didn't yield results basic_query = f"{query} stock market news" fallback_url = f"https://www.googleapis.com/customsearch/v1?q={basic_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}" fallback_response = requests.get(fallback_url) if fallback_response.status_code == 200: fallback_data = fallback_response.json() results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} for item in fallback_data.get("items", [])] return results return [] @staticmethod @tool def extract_web_content(urls: List[str], max_urls: int = 3) -> List[Dict[str, str]]: """Extract content from web pages. Args: urls: List of URLs to extract content from. max_urls: Maximum number of URLs to process (default: 3). Returns: A list of dictionaries containing url, title, and content. """ results = [] processed = 0 for url in urls[:max_urls]: try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Get title title = soup.title.text.strip() if soup.title else "No title" # Get main content - this is a simplified approach # Remove scripts, styles, and navigation elements for script in soup(["script", "style", "nav", "footer", "header", "aside"]): script.extract() # Get text and clean it text = soup.get_text(separator='\n') lines = [line.strip() for line in text.splitlines() if line.strip()] content = '\n'.join(lines) # Truncate if too long if len(content) > 5000: content = content[:5000] + "..." results.append({ "url": url, "title": title, "content": content }) processed += 1 # Be polite to servers with a delay between requests if processed < max_urls: time.sleep(1) except Exception as e: results.append({ "url": url, "title": "Error", "content": f"Failed to extract content: {str(e)}" }) return results @staticmethod @tool def fetch_stock_price(ticker: str) -> Optional[float]: """Fetch the latest stock price for a given ticker symbol. Args: ticker: The stock ticker symbol (e.g., "AAPL"). Returns: The latest closing stock price. """ try: stock = yf.Ticker(ticker) return stock.history(period="1d")["Close"].iloc[-1] except Exception: return None @staticmethod @tool def summarize_news(news: List[Dict[str, Any]], extracted_content: List[Dict[str, str]] = None) -> str: """Summarize stock-related news snippets and extracted web content using Groq API. Args: news: A list of news snippet dictionaries to summarize. extracted_content: Optional list of extracted web content. Returns: A summarized text of the news. """ if not news: return "No news available." headers = { "Content-Type": "application/json", "Authorization": f"Bearer {GROQ_API_KEY}" } # Combine snippets and extracted content if available summary_text = "News snippets:\n" + "\n".join([item["snippet"] for item in news]) if extracted_content and len(extracted_content) > 0: summary_text += "\n\nDetailed content from articles:\n" for i, item in enumerate(extracted_content, 1): # Extract key paragraphs - first 500 chars, then some from middle, then some from end content = item["content"] total_length = len(content) if total_length > 1500: start = content[:500] middle_start = total_length // 2 - 250 middle = content[middle_start:middle_start + 500] end = content[-500:] extracted_text = f"{start}\n...[middle content]...\n{middle}\n...[more content]...\n{end}" else: extracted_text = content summary_text += f"\n--- Article {i}: {item['title']} ---\n{extracted_text}\n" # Create prompt for summarization data = { "model": GROQ_MODEL, "messages": [ {"role": "system", "content": "You are a financial news summarizer. Provide a concise summary of the most important information from these financial news articles, focused on stock performance, company updates, market trends, and financial metrics."}, {"role": "user", "content": f"Summarize this financial news:\n{summary_text}"} ], "temperature": 0.3, "max_tokens": 1000 } response = requests.post( "https://api.groq.com/openai/v1/chat/completions", headers=headers, json=data ) return response.json()["choices"][0]["message"]["content"] if response.status_code == 200 else "Error generating summary." @staticmethod @tool def analyze_sentiment(text: str) -> Dict[str, Any]: """Perform sentiment analysis on the provided text. Args: text: The text to analyze. Returns: A dictionary containing sentiment label and confidence score. """ result = sentiment_analyzer(text)[0] return {"label": result["label"], "score": result["score"]} @staticmethod @tool def plot_historical_trends(ticker: str, period: str = "1mo") -> str: """Generate a plot for historical stock trends. Args: ticker: The stock ticker symbol. period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "1mo". Returns: File path to the saved plot image. """ try: stock = yf.Ticker(ticker) history = stock.history(period=period) if history.empty: return f"No historical data available for {ticker}" plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Stock Price (₹)") plt.title(f"Historical Stock Trends for {ticker} ({period})") plt.legend() plt.grid() file_path = f"{ticker.replace('.', '_')}_historical_trends.png" plt.savefig(file_path) plt.close() return file_path except Exception as e: return f"Error fetching historical data: {str(e)}" @staticmethod @tool def plot_nifty_chart(period: str = "6mo") -> str: """Generate a plot for Nifty 50 index. Args: period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "6mo". Returns: File path to the saved plot image. """ try: nifty = yf.Ticker("^NSEI") # Nifty 50 index history = nifty.history(period=period) if history.empty: return "No historical data available for Nifty 50" plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Index Value") plt.title(f"Nifty 50 Index - {period}") plt.legend() plt.grid() file_path = f"nifty50_chart_{period}.png" plt.savefig(file_path) plt.close() return file_path except Exception as e: return f"Error creating Nifty chart: {str(e)}" # Initialize tools smola_tools = SmolaTools() # Initialize language model using LiteLLM with Groq model = LiteLLMModel( model_id=f"groq/{GROQ_MODEL}", api_key=GROQ_API_KEY, ) # Initialize agent with tools agent = CodeAgent( tools=[ smola_tools.detect_ticker_from_query, smola_tools.fetch_stock_news, smola_tools.fetch_stock_price, smola_tools.summarize_news, smola_tools.analyze_sentiment, smola_tools.plot_historical_trends, smola_tools.plot_nifty_chart, smola_tools.extract_web_content ], model=model ) def stock_research(query, ticker=None, progress=gr.Progress()): price_md = gr.Markdown("## Current Price\nDetecting ticker...") news_md = gr.Markdown("## Search Results\nWaiting...") summary_md = gr.Markdown("## News Summary\nWaiting...") sentiment_md = gr.Markdown("## Sentiment Analysis\nWaiting...") chart = None # Print debugging info print(f"Starting analysis with query: '{query}', manually entered ticker: '{ticker}'") # Step 0: Use manually entered ticker or detect from query progress(0/6, desc="Detecting ticker") if ticker and ticker.strip(): # User provided a ticker manually ticker = ticker.strip().upper() print(f"Using manually entered ticker: {ticker}") price_md = gr.Markdown(f"## Current Price\nUsing manually entered ticker: {ticker}") else: # No ticker provided, attempt to detect from query try: # Use the smolagents detection tool print("Attempting to detect ticker using SmolaTools") ticker_info = smola_tools.detect_ticker_from_query(query) ticker = ticker_info.get("ticker", "") confidence = ticker_info.get("confidence", "none") method = ticker_info.get("method", "none") print(f"Ticker detection result: {ticker} (confidence: {confidence}, method: {method})") if ticker and confidence != "none": price_md = gr.Markdown(f"## Current Price\nDetected ticker: {ticker} (confidence: {confidence})") else: # If no ticker detected, use Nifty 50 ticker = "^NSEI" # Nifty 50 index print(f"No ticker detected with SmolaTools, using default: {ticker}") price_md = gr.Markdown(f"## Current Price\nNo specific ticker detected. Using Nifty 50 Index.") except Exception as e: print(f"Error in SmolaTools ticker detection: {str(e)}") price_md = gr.Markdown(f"## Current Price\nError detecting ticker. Using Nifty 50 as default.") ticker = "^NSEI" # Nifty 50 index yield price_md, news_md, summary_md, sentiment_md, chart # Step 1: Get stock price progress(1/6, desc="Fetching stock price") try: print(f"Fetching price for ticker: {ticker}") if ticker == "^NSEI": # For Nifty 50 index print("Getting Nifty 50 price") nifty = yf.Ticker("^NSEI") stock_price = nifty.history(period="1d")["Close"].iloc[-1] price_text = f"## Current Price\nNifty 50 Index: {stock_price:.2f}" else: # Try with NSE first ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker print(f"Trying NSE ticker: {ticker_with_suffix}") stock = yf.Ticker(ticker_with_suffix) history = stock.history(period="1d") if not history.empty: stock_price = history["Close"].iloc[-1] print(f"Found NSE price: {stock_price}") else: stock_price = None print("No NSE data found") if stock_price is None: # Try BSE if NSE failed bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" print(f"Trying BSE ticker: {bse_ticker}") stock = yf.Ticker(bse_ticker) history = stock.history(period="1d") if not history.empty: stock_price = history["Close"].iloc[-1] ticker_with_suffix = bse_ticker print(f"Found BSE price: {stock_price}") else: print("No BSE data found") if stock_price is not None: price_text = f"## Current Price for {ticker}\n₹{stock_price:.2f}" else: print("Could not find price data, using Nifty as fallback") # Switch to Nifty if no data for the requested ticker ticker = "^NSEI" nifty = yf.Ticker("^NSEI") stock_price = nifty.history(period="1d")["Close"].iloc[-1] price_text = f"## Current Price\nCould not find data for {ticker}. Showing Nifty 50 Index: {stock_price:.2f}" price_md = gr.Markdown(price_text) except Exception as e: print(f"Error fetching price: {str(e)}") price_md = gr.Markdown(f"## Current Price\nError fetching price: {str(e)}") # Switch to Nifty for the rest of the analysis ticker = "^NSEI" yield price_md, news_md, summary_md, sentiment_md, chart # Step 2: Get news progress(2/6, desc="Searching for news") news_articles = [] try: print(f"Searching news for query: '{query}'") # Use the original query directly for news search, not the ticker news_articles = smola_tools.fetch_stock_news(query) # Format search results news_text = f"## Search Results\nFound {len(news_articles)} news articles for: '{query}'\n\n" for i, article in enumerate(news_articles, 1): news_text += f"{i}. [{article['title']}]({article['link']})\n" news_md = gr.Markdown(news_text) print(f"Found {len(news_articles)} news articles") except Exception as e: print(f"Error fetching news: {str(e)}") news_md = gr.Markdown(f"## Search Results\nError fetching news: {str(e)}") yield price_md, news_md, summary_md, sentiment_md, chart # Step 3: Extract web content from the news article URLs progress(3/6, desc="Extracting article content") extracted_content = [] try: if news_articles: print("Extracting web content from news URLs") # Get URLs from the news articles urls = [article["link"] for article in news_articles] # Create simulated extracted content if extract_web_content isn't available if not hasattr(smola_tools, 'extract_web_content'): print("extract_web_content not available, creating simulated content") extracted_content = [] for i, article in enumerate(news_articles[:3]): extracted_content.append({ "url": article["link"], "title": article["title"], "content": article["snippet"] * 5 # Repeat snippet to simulate content }) else: # Extract content from the top 3 URLs extracted_content = smola_tools.extract_web_content(urls, max_urls=3) # Add a note to the search results news_text += "\n\n*Extracted detailed content from top articles for deeper analysis*" news_md = gr.Markdown(news_text) print(f"Extracted content from {len(extracted_content)} articles") except Exception as e: print(f"Error extracting web content: {str(e)}") yield price_md, news_md, summary_md, sentiment_md, chart # Step 4: Summarize news progress(4/6, desc="Summarizing news") news_summary = "No news available to summarize." try: if news_articles: print("Summarizing news") # Simple fallback summary if we can't use the API simple_summary = "This is a summary of recent market news: " for article in news_articles[:3]: simple_summary += article["snippet"] + " " try: # Try using the proper summarization tool news_summary = smola_tools.summarize_news(news_articles, extracted_content) print("News summary created using API") except Exception as e: print(f"Error using summarize_news API: {str(e)}, using simple summary") news_summary = simple_summary summary_md = gr.Markdown(f"## News Summary\n{news_summary}") else: summary_md = gr.Markdown("## News Summary\nNo news available to summarize.") except Exception as e: print(f"Error creating summary: {str(e)}") summary_md = gr.Markdown(f"## News Summary\nError creating summary: {str(e)}") yield price_md, news_md, summary_md, sentiment_md, chart # Step 5: Sentiment analysis progress(5/6, desc="Analyzing sentiment") sentiment = {"label": "UNKNOWN", "score": 0.0} try: if news_summary != "No news available to summarize.": print("Analyzing sentiment") try: sentiment = smola_tools.analyze_sentiment(news_summary) print(f"Sentiment: {sentiment['label']} (score: {sentiment['score']})") except Exception as e: print(f"Error in sentiment analysis API: {str(e)}") # Fallback basic sentiment if "growth" in news_summary.lower() or "positive" in news_summary.lower(): sentiment = {"label": "POSITIVE", "score": 0.75} elif "decline" in news_summary.lower() or "negative" in news_summary.lower(): sentiment = {"label": "NEGATIVE", "score": 0.75} else: sentiment = {"label": "NEUTRAL", "score": 0.5} sentiment_md = gr.Markdown(f"## Sentiment Analysis\n**{sentiment['label']}** (confidence: {sentiment['score']:.2f})") else: sentiment_md = gr.Markdown("## Sentiment Analysis\nNo content available for analysis") except Exception as e: print(f"Error in sentiment analysis: {str(e)}") sentiment_md = gr.Markdown(f"## Sentiment Analysis\nError in analysis: {str(e)}") yield price_md, news_md, summary_md, sentiment_md, chart # Step 6: Generate chart progress(6/6, desc="Creating chart") try: print(f"Creating chart for {ticker}") if ticker == "^NSEI": # For Nifty 50 index - direct implementation print("Creating Nifty chart directly") nifty = yf.Ticker("^NSEI") history = nifty.history(period="6mo") if not history.empty: plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Index Value") plt.title(f"Nifty 50 Index - 6mo") plt.legend() plt.grid() file_path = f"nifty50_chart.png" plt.savefig(file_path) plt.close() chart = file_path print(f"Nifty chart saved to {file_path}") else: print("No history data for Nifty") else: # For other stocks ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker stock = yf.Ticker(ticker_with_suffix) history = stock.history(period="6mo") if not history.empty: plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Stock Price (₹)") plt.title(f"Historical Stock Trends for {ticker} (6mo)") plt.legend() plt.grid() file_path = f"{ticker.replace('.', '_')}_chart.png" plt.savefig(file_path) plt.close() chart = file_path print(f"Stock chart saved to {file_path}") else: # Try BSE if NSE failed print("No NSE data, trying BSE") bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" stock = yf.Ticker(bse_ticker) history = stock.history(period="6mo") if not history.empty: plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Stock Price (₹)") plt.title(f"Historical Stock Trends for {ticker} (6mo)") plt.legend() plt.grid() file_path = f"{ticker.replace('.', '_')}_chart.png" plt.savefig(file_path) plt.close() chart = file_path print(f"BSE Stock chart saved to {file_path}") else: # Fallback to Nifty if both failed print("No BSE data either, falling back to Nifty") nifty = yf.Ticker("^NSEI") history = nifty.history(period="6mo") if not history.empty: plt.figure(figsize=(10, 5)) plt.plot(history.index, history["Close"], label="Close Price", marker='o') plt.xlabel("Date") plt.ylabel("Index Value") plt.title(f"Nifty 50 Index - 6mo (Fallback)") plt.legend() plt.grid() file_path = f"nifty50_fallback_chart.png" plt.savefig(file_path) plt.close() chart = file_path print(f"Fallback Nifty chart saved to {file_path}") except Exception as e: print(f"Error creating chart: {str(e)}") # Try simple Nifty chart as final fallback try: print("Attempting simple Nifty chart as final fallback") nifty = yf.Ticker("^NSEI") history = nifty.history(period="1mo") plt.figure(figsize=(8, 4)) plt.plot(history.index, history["Close"]) plt.title("Nifty 50 - Emergency Fallback") plt.grid(True) file_path = "emergency_nifty_chart.png" plt.savefig(file_path) plt.close() chart = file_path print(f"Emergency fallback chart saved") except: print("Even emergency fallback failed") # Complete progress(1.0) yield price_md, news_md, summary_md, sentiment_md, chart # Define the interface iface = gr.Interface( fn=stock_research, inputs=[ gr.Textbox(label="Search Query", placeholder="Enter search query like 'latest news on HDFC Bank'"), gr.Textbox(label="Stock Ticker (Optional)", placeholder="e.g., HDFCBANK, RELIANCE, TCS (leave empty for automatic detection)") ], outputs=[ gr.Markdown(label="Current Price"), gr.Markdown(label="Search Results"), gr.Markdown(label="News Summary"), gr.Markdown(label="Sentiment Analysis"), gr.Image(label="Stock Chart", type="filepath") ], title="Indian Stock Market Research Tool", description="Enter a stock-related search term. The system will automatically detect the ticker symbol or use Nifty 50 if no ticker is found. You can also manually specify a ticker if desired." ) if __name__ == "__main__": # Launch with queueing for streaming updates iface.queue().launch()