import os |
import sys |
import requests |
import yfinance as yf |
import matplotlib.pyplot as plt |
import pandas as pd |
from transformers import pipeline |
from dotenv import load_dotenv |
import gradio as gr |
from smolagents import CodeAgent, tool |
from smolagents.models import LiteLLMModel |
from typing import List, Dict, Any, Optional, Union |
import time |
import re |
from bs4 import BeautifulSoup |
load_dotenv(override=True) |
sentiment_analyzer = pipeline( |
"sentiment-analysis", |
model="distilbert/distilbert-base-uncased-finetuned-sst-2-english" |
) |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
GROQ_MODEL = "llama3-70b-8192" |
class SmolaTools: |
@staticmethod |
@tool |
def detect_ticker_from_query(query: str) -> Dict[str, Any]: |
""" |
Extract company name from query and find its stock ticker. |
Args: |
query: The search query to analyze |
Returns: |
Dictionary with detected ticker and confidence |
""" |
try: |
headers = { |
"Content-Type": "application/json", |
"Authorization": f"Bearer {GROQ_API_KEY}" |
} |
name_data = { |
"model": GROQ_MODEL, |
"messages": [ |
{"role": "system", "content": "You are a financial assistant. Extract only the company or stock name from the query. Respond with ONLY the company name, nothing else."}, |
{"role": "user", "content": f"Extract the company name from this query: {query}"} |
], |
"temperature": 0.1, |
"max_tokens": 50 |
} |
name_response = requests.post( |
"https://api.groq.com/openai/v1/chat/completions", |
headers=headers, |
json=name_data |
) |
if name_response.status_code == 200: |
company_name = name_response.json()["choices"][0]["message"]["content"].strip() |
print(f"Extracted company name: {company_name}") |
if not company_name or company_name.lower() == "unknown": |
return {"ticker": "", "confidence": "none", "method": "none"} |
API_KEY = os.getenv("API_KEY") |
CSE_ID = os.getenv("CSE_ID") |
ticker_search_query = f"{company_name} stock ticker symbol NSE BSE" |
search_url = f"https://www.googleapis.com/customsearch/v1?q={ticker_search_query}&cx={CSE_ID}&key={API_KEY}&num=3" |
search_response = requests.get(search_url) |
if search_response.status_code == 200: |
search_data = search_response.json() |
texts = [] |
for item in search_data.get("items", []): |
texts.append(item.get("title", "")) |
texts.append(item.get("snippet", "")) |
combined_text = " ".join(texts) |
print(f"Search results obtained for ticker lookup") |
ticker_data = { |
"model": GROQ_MODEL, |
"messages": [ |
{"role": "system", "content": "You are a financial expert. Extract ONLY the stock ticker symbol for the company from the text. Return ONLY the ticker symbol (e.g., 'RELIANCE' for Reliance Industries). If uncertain, respond with UNKNOWN."}, |
{"role": "user", "content": f"Extract the stock ticker symbol for {company_name} from this text: {combined_text}"} |
], |
"temperature": 0.1, |
"max_tokens": 10 |
} |
ticker_response = requests.post( |
"https://api.groq.com/openai/v1/chat/completions", |
headers=headers, |
json=ticker_data |
) |
if ticker_response.status_code == 200: |
ticker = ticker_response.json()["choices"][0]["message"]["content"].strip() |
print(f"Extracted ticker: {ticker}") |
if ticker and ticker != "UNKNOWN": |
return {"ticker": ticker, "confidence": "medium", "method": "company_search"} |
except Exception as e: |
print(f"Error in company/ticker detection: {str(e)}") |
return {"ticker": "", "confidence": "none", "method": "none"} |
@staticmethod |
@tool |
def fetch_stock_news(query: str, num_results: int = 5) -> List[Dict[str, str]]: |
"""Fetch stock market news using Google Custom Search API. |
Args: |
query: The search term for stock-related news. |
num_results: The number of news articles to fetch (default: 10). |
Returns: |
A list of dictionaries containing news title, link, and snippet. |
""" |
API_KEY = os.getenv("API_KEY") |
CSE_ID = os.getenv("CSE_ID") |
stock_terms = query.split() |
stock_name = stock_terms[-1] if len(stock_terms) > 0 else "" |
expanded_query = f"{query}" |
site_exclusions = () |
news_sites = ( |
"site:economictimes.indiatimes.com OR " |
"site:moneycontrol.com OR " |
"site:livemint.com OR " |
"site:ndtv.com/business OR " |
"site:cnbctv18.com OR " |
"site:business-standard.com OR " |
"site:financialexpress.com OR " |
"site:reuters.com OR " |
"site:bloomberg.com OR " |
"site:investing.com OR " |
"site:marketwatch.com OR " |
"site:thehindu.com/business OR " |
"site:business-today.in OR " |
"site:fool.com OR " |
"site:zeebiz.com OR " |
"site:screener.in" |
) |
final_query = f"{expanded_query} {site_exclusions} ({news_sites})" |
url = f"https://www.googleapis.com/customsearch/v1?q={final_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}&sort=date" |
print(f"Searching with query: {final_query}") |
response = requests.get(url) |
if response.status_code == 200: |
data = response.json() |
results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} for item in data.get("items", [])] |
if not results: |
basic_query = f"{query} stock market news" |
fallback_url = f"https://www.googleapis.com/customsearch/v1?q={basic_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}" |
fallback_response = requests.get(fallback_url) |
if fallback_response.status_code == 200: |
fallback_data = fallback_response.json() |
results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} |
for item in fallback_data.get("items", [])] |
return results |
return [] |
@staticmethod |
@tool |
def extract_web_content(urls: List[str], max_urls: int = 3) -> List[Dict[str, str]]: |
"""Extract content from web pages. |
Args: |
urls: List of URLs to extract content from. |
max_urls: Maximum number of URLs to process (default: 3). |
Returns: |
A list of dictionaries containing url, title, and content. |
""" |
results = [] |
processed = 0 |
for url in urls[:max_urls]: |
try: |
headers = { |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
} |
response = requests.get(url, headers=headers, timeout=10) |
response.raise_for_status() |
soup = BeautifulSoup(response.text, 'html.parser') |
title = soup.title.text.strip() if soup.title else "No title" |
for script in soup(["script", "style", "nav", "footer", "header", "aside"]): |
script.extract() |
text = soup.get_text(separator='\n') |
lines = [line.strip() for line in text.splitlines() if line.strip()] |
content = '\n'.join(lines) |
if len(content) > 5000: |
content = content[:5000] + "..." |
results.append({ |
"url": url, |
"title": title, |
"content": content |
}) |
processed += 1 |
if processed < max_urls: |
time.sleep(1) |
except Exception as e: |
results.append({ |
"url": url, |
"title": "Error", |
"content": f"Failed to extract content: {str(e)}" |
}) |
return results |
@staticmethod |
@tool |
def fetch_stock_price(ticker: str) -> Optional[float]: |
"""Fetch the latest stock price for a given ticker symbol. |
Args: |
ticker: The stock ticker symbol (e.g., "AAPL"). |
Returns: |
The latest closing stock price. |
""" |
try: |
stock = yf.Ticker(ticker) |
return stock.history(period="1d")["Close"].iloc[-1] |
except Exception: |
return None |
@staticmethod |
@tool |
def summarize_news(news: List[Dict[str, Any]], extracted_content: List[Dict[str, str]] = None) -> str: |
"""Summarize stock-related news snippets and extracted web content using Groq API. |
Args: |
news: A list of news snippet dictionaries to summarize. |
extracted_content: Optional list of extracted web content. |
Returns: |
A summarized text of the news. |
""" |
if not news: |
return "No news available." |
headers = { |
"Content-Type": "application/json", |
"Authorization": f"Bearer {GROQ_API_KEY}" |
} |
summary_text = "News snippets:\n" + "\n".join([item["snippet"] for item in news]) |
if extracted_content and len(extracted_content) > 0: |
summary_text += "\n\nDetailed content from articles:\n" |
for i, item in enumerate(extracted_content, 1): |
content = item["content"] |
total_length = len(content) |
if total_length > 1500: |
start = content[:500] |
middle_start = total_length // 2 - 250 |
middle = content[middle_start:middle_start + 500] |
end = content[-500:] |
extracted_text = f"{start}\n...[middle content]...\n{middle}\n...[more content]...\n{end}" |
else: |
extracted_text = content |
summary_text += f"\n--- Article {i}: {item['title']} ---\n{extracted_text}\n" |
data = { |
"model": GROQ_MODEL, |
"messages": [ |
{"role": "system", "content": "You are a financial news summarizer. Provide a concise summary of the most important information from these financial news articles, focused on stock performance, company updates, market trends, and financial metrics."}, |
{"role": "user", "content": f"Summarize this financial news:\n{summary_text}"} |
], |
"temperature": 0.3, |
"max_tokens": 1000 |
} |
response = requests.post( |
"https://api.groq.com/openai/v1/chat/completions", |
headers=headers, |
json=data |
) |
return response.json()["choices"][0]["message"]["content"] if response.status_code == 200 else "Error generating summary." |
@staticmethod |
@tool |
def analyze_sentiment(text: str) -> Dict[str, Any]: |
"""Perform sentiment analysis on the provided text. |
Args: |
text: The text to analyze. |
Returns: |
A dictionary containing sentiment label and confidence score. |
""" |
result = sentiment_analyzer(text)[0] |
return {"label": result["label"], "score": result["score"]} |
@staticmethod |
@tool |
def plot_historical_trends(ticker: str, period: str = "1mo") -> str: |
"""Generate a plot for historical stock trends. |
Args: |
ticker: The stock ticker symbol. |
period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "1mo". |
Returns: |
File path to the saved plot image. |
""" |
try: |
stock = yf.Ticker(ticker) |
history = stock.history(period=period) |
if history.empty: |
return f"No historical data available for {ticker}" |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Stock Price (₹)") |
plt.title(f"Historical Stock Trends for {ticker} ({period})") |
plt.legend() |
plt.grid() |
file_path = f"{ticker.replace('.', '_')}_historical_trends.png" |
plt.savefig(file_path) |
plt.close() |
return file_path |
except Exception as e: |
return f"Error fetching historical data: {str(e)}" |
@staticmethod |
@tool |
def plot_nifty_chart(period: str = "6mo") -> str: |
"""Generate a plot for Nifty 50 index. |
Args: |
period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "6mo". |
Returns: |
File path to the saved plot image. |
""" |
try: |
nifty = yf.Ticker("^NSEI") |
history = nifty.history(period=period) |
if history.empty: |
return "No historical data available for Nifty 50" |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Index Value") |
plt.title(f"Nifty 50 Index - {period}") |
plt.legend() |
plt.grid() |
file_path = f"nifty50_chart_{period}.png" |
plt.savefig(file_path) |
plt.close() |
return file_path |
except Exception as e: |
return f"Error creating Nifty chart: {str(e)}" |
smola_tools = SmolaTools() |
model = LiteLLMModel( |
model_id=f"groq/{GROQ_MODEL}", |
api_key=GROQ_API_KEY, |
) |
agent = CodeAgent( |
tools=[ |
smola_tools.detect_ticker_from_query, |
smola_tools.fetch_stock_news, |
smola_tools.fetch_stock_price, |
smola_tools.summarize_news, |
smola_tools.analyze_sentiment, |
smola_tools.plot_historical_trends, |
smola_tools.plot_nifty_chart, |
smola_tools.extract_web_content |
], |
model=model |
) |
def stock_research(query, ticker=None, progress=gr.Progress()): |
price_md = gr.Markdown("## Current Price\nDetecting ticker...") |
news_md = gr.Markdown("## Search Results\nWaiting...") |
summary_md = gr.Markdown("## News Summary\nWaiting...") |
sentiment_md = gr.Markdown("## Sentiment Analysis\nWaiting...") |
chart = None |
print(f"Starting analysis with query: '{query}', manually entered ticker: '{ticker}'") |
progress(0/6, desc="Detecting ticker") |
if ticker and ticker.strip(): |
ticker = ticker.strip().upper() |
print(f"Using manually entered ticker: {ticker}") |
price_md = gr.Markdown(f"## Current Price\nUsing manually entered ticker: {ticker}") |
else: |
try: |
print("Attempting to detect ticker using SmolaTools") |
ticker_info = smola_tools.detect_ticker_from_query(query) |
ticker = ticker_info.get("ticker", "") |
confidence = ticker_info.get("confidence", "none") |
method = ticker_info.get("method", "none") |
print(f"Ticker detection result: {ticker} (confidence: {confidence}, method: {method})") |
if ticker and confidence != "none": |
price_md = gr.Markdown(f"## Current Price\nDetected ticker: {ticker} (confidence: {confidence})") |
else: |
ticker = "^NSEI" |
print(f"No ticker detected with SmolaTools, using default: {ticker}") |
price_md = gr.Markdown(f"## Current Price\nNo specific ticker detected. Using Nifty 50 Index.") |
except Exception as e: |
print(f"Error in SmolaTools ticker detection: {str(e)}") |
price_md = gr.Markdown(f"## Current Price\nError detecting ticker. Using Nifty 50 as default.") |
ticker = "^NSEI" |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(1/6, desc="Fetching stock price") |
try: |
print(f"Fetching price for ticker: {ticker}") |
if ticker == "^NSEI": |
print("Getting Nifty 50 price") |
nifty = yf.Ticker("^NSEI") |
stock_price = nifty.history(period="1d")["Close"].iloc[-1] |
price_text = f"## Current Price\nNifty 50 Index: {stock_price:.2f}" |
else: |
ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker |
print(f"Trying NSE ticker: {ticker_with_suffix}") |
stock = yf.Ticker(ticker_with_suffix) |
history = stock.history(period="1d") |
if not history.empty: |
stock_price = history["Close"].iloc[-1] |
print(f"Found NSE price: {stock_price}") |
else: |
stock_price = None |
print("No NSE data found") |
if stock_price is None: |
bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" |
print(f"Trying BSE ticker: {bse_ticker}") |
stock = yf.Ticker(bse_ticker) |
history = stock.history(period="1d") |
if not history.empty: |
stock_price = history["Close"].iloc[-1] |
ticker_with_suffix = bse_ticker |
print(f"Found BSE price: {stock_price}") |
else: |
print("No BSE data found") |
if stock_price is not None: |
price_text = f"## Current Price for {ticker}\n₹{stock_price:.2f}" |
else: |
print("Could not find price data, using Nifty as fallback") |
ticker = "^NSEI" |
nifty = yf.Ticker("^NSEI") |
stock_price = nifty.history(period="1d")["Close"].iloc[-1] |
price_text = f"## Current Price\nCould not find data for {ticker}. Showing Nifty 50 Index: {stock_price:.2f}" |
price_md = gr.Markdown(price_text) |
except Exception as e: |
print(f"Error fetching price: {str(e)}") |
price_md = gr.Markdown(f"## Current Price\nError fetching price: {str(e)}") |
ticker = "^NSEI" |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(2/6, desc="Searching for news") |
news_articles = [] |
try: |
print(f"Searching news for query: '{query}'") |
news_articles = smola_tools.fetch_stock_news(query) |
news_text = f"## Search Results\nFound {len(news_articles)} news articles for: '{query}'\n\n" |
for i, article in enumerate(news_articles, 1): |
news_text += f"{i}. [{article['title']}]({article['link']})\n" |
news_md = gr.Markdown(news_text) |
print(f"Found {len(news_articles)} news articles") |
except Exception as e: |
print(f"Error fetching news: {str(e)}") |
news_md = gr.Markdown(f"## Search Results\nError fetching news: {str(e)}") |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(3/6, desc="Extracting article content") |
extracted_content = [] |
try: |
if news_articles: |
print("Extracting web content from news URLs") |
urls = [article["link"] for article in news_articles] |
if not hasattr(smola_tools, 'extract_web_content'): |
print("extract_web_content not available, creating simulated content") |
extracted_content = [] |
for i, article in enumerate(news_articles[:3]): |
extracted_content.append({ |
"url": article["link"], |
"title": article["title"], |
"content": article["snippet"] * 5 |
}) |
else: |
extracted_content = smola_tools.extract_web_content(urls, max_urls=3) |
news_text += "\n\n*Extracted detailed content from top articles for deeper analysis*" |
news_md = gr.Markdown(news_text) |
print(f"Extracted content from {len(extracted_content)} articles") |
except Exception as e: |
print(f"Error extracting web content: {str(e)}") |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(4/6, desc="Summarizing news") |
news_summary = "No news available to summarize." |
try: |
if news_articles: |
print("Summarizing news") |
simple_summary = "This is a summary of recent market news: " |
for article in news_articles[:3]: |
simple_summary += article["snippet"] + " " |
try: |
news_summary = smola_tools.summarize_news(news_articles, extracted_content) |
print("News summary created using API") |
except Exception as e: |
print(f"Error using summarize_news API: {str(e)}, using simple summary") |
news_summary = simple_summary |
summary_md = gr.Markdown(f"## News Summary\n{news_summary}") |
else: |
summary_md = gr.Markdown("## News Summary\nNo news available to summarize.") |
except Exception as e: |
print(f"Error creating summary: {str(e)}") |
summary_md = gr.Markdown(f"## News Summary\nError creating summary: {str(e)}") |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(5/6, desc="Analyzing sentiment") |
sentiment = {"label": "UNKNOWN", "score": 0.0} |
try: |
if news_summary != "No news available to summarize.": |
print("Analyzing sentiment") |
try: |
sentiment = smola_tools.analyze_sentiment(news_summary) |
print(f"Sentiment: {sentiment['label']} (score: {sentiment['score']})") |
except Exception as e: |
print(f"Error in sentiment analysis API: {str(e)}") |
if "growth" in news_summary.lower() or "positive" in news_summary.lower(): |
sentiment = {"label": "POSITIVE", "score": 0.75} |
elif "decline" in news_summary.lower() or "negative" in news_summary.lower(): |
sentiment = {"label": "NEGATIVE", "score": 0.75} |
else: |
sentiment = {"label": "NEUTRAL", "score": 0.5} |
sentiment_md = gr.Markdown(f"## Sentiment Analysis\n**{sentiment['label']}** (confidence: {sentiment['score']:.2f})") |
else: |
sentiment_md = gr.Markdown("## Sentiment Analysis\nNo content available for analysis") |
except Exception as e: |
print(f"Error in sentiment analysis: {str(e)}") |
sentiment_md = gr.Markdown(f"## Sentiment Analysis\nError in analysis: {str(e)}") |
yield price_md, news_md, summary_md, sentiment_md, chart |
progress(6/6, desc="Creating chart") |
try: |
print(f"Creating chart for {ticker}") |
if ticker == "^NSEI": |
print("Creating Nifty chart directly") |
nifty = yf.Ticker("^NSEI") |
history = nifty.history(period="6mo") |
if not history.empty: |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Index Value") |
plt.title(f"Nifty 50 Index - 6mo") |
plt.legend() |
plt.grid() |
file_path = f"nifty50_chart.png" |
plt.savefig(file_path) |
plt.close() |
chart = file_path |
print(f"Nifty chart saved to {file_path}") |
else: |
print("No history data for Nifty") |
else: |
ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker |
stock = yf.Ticker(ticker_with_suffix) |
history = stock.history(period="6mo") |
if not history.empty: |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Stock Price (₹)") |
plt.title(f"Historical Stock Trends for {ticker} (6mo)") |
plt.legend() |
plt.grid() |
file_path = f"{ticker.replace('.', '_')}_chart.png" |
plt.savefig(file_path) |
plt.close() |
chart = file_path |
print(f"Stock chart saved to {file_path}") |
else: |
print("No NSE data, trying BSE") |
bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" |
stock = yf.Ticker(bse_ticker) |
history = stock.history(period="6mo") |
if not history.empty: |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Stock Price (₹)") |
plt.title(f"Historical Stock Trends for {ticker} (6mo)") |
plt.legend() |
plt.grid() |
file_path = f"{ticker.replace('.', '_')}_chart.png" |
plt.savefig(file_path) |
plt.close() |
chart = file_path |
print(f"BSE Stock chart saved to {file_path}") |
else: |
print("No BSE data either, falling back to Nifty") |
nifty = yf.Ticker("^NSEI") |
history = nifty.history(period="6mo") |
if not history.empty: |
plt.figure(figsize=(10, 5)) |
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
plt.xlabel("Date") |
plt.ylabel("Index Value") |
plt.title(f"Nifty 50 Index - 6mo (Fallback)") |
plt.legend() |
plt.grid() |
file_path = f"nifty50_fallback_chart.png" |
plt.savefig(file_path) |
plt.close() |
chart = file_path |
print(f"Fallback Nifty chart saved to {file_path}") |
except Exception as e: |
print(f"Error creating chart: {str(e)}") |
try: |
print("Attempting simple Nifty chart as final fallback") |
nifty = yf.Ticker("^NSEI") |
history = nifty.history(period="1mo") |
plt.figure(figsize=(8, 4)) |
plt.plot(history.index, history["Close"]) |
plt.title("Nifty 50 - Emergency Fallback") |
plt.grid(True) |
file_path = "emergency_nifty_chart.png" |
plt.savefig(file_path) |
plt.close() |
chart = file_path |
print(f"Emergency fallback chart saved") |
except: |
print("Even emergency fallback failed") |
progress(1.0) |
yield price_md, news_md, summary_md, sentiment_md, chart |
iface = gr.Interface( |
fn=stock_research, |
inputs=[ |
gr.Textbox(label="Search Query", placeholder="Enter search query like 'latest news on HDFC Bank'"), |
gr.Textbox(label="Stock Ticker (Optional)", placeholder="e.g., HDFCBANK, RELIANCE, TCS (leave empty for automatic detection)") |
], |
outputs=[ |
gr.Markdown(label="Current Price"), |
gr.Markdown(label="Search Results"), |
gr.Markdown(label="News Summary"), |
gr.Markdown(label="Sentiment Analysis"), |
gr.Image(label="Stock Chart", type="filepath") |
], |
title="Indian Stock Market Research Tool", |
description="Enter a stock-related search term. The system will automatically detect the ticker symbol or use Nifty 50 if no ticker is found. You can also manually specify a ticker if desired." |
) |
if __name__ == "__main__": |
iface.queue().launch() |