|
import os |
|
import sys |
|
import requests |
|
import yfinance as yf |
|
import matplotlib.pyplot as plt |
|
import pandas as pd |
|
from transformers import pipeline |
|
from dotenv import load_dotenv |
|
import gradio as gr |
|
from smolagents import CodeAgent, tool |
|
from smolagents.models import LiteLLMModel |
|
from typing import List, Dict, Any, Optional, Union |
|
import time |
|
import re |
|
from bs4 import BeautifulSoup |
|
|
|
|
|
load_dotenv(override=True) |
|
|
|
|
|
sentiment_analyzer = pipeline( |
|
"sentiment-analysis", |
|
model="distilbert/distilbert-base-uncased-finetuned-sst-2-english" |
|
) |
|
|
|
|
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
GROQ_MODEL = "llama3-70b-8192" |
|
|
|
|
|
class SmolaTools: |
|
@staticmethod |
|
@tool |
|
def detect_ticker_from_query(query: str) -> Dict[str, Any]: |
|
""" |
|
Extract company name from query and find its stock ticker. |
|
|
|
Args: |
|
query: The search query to analyze |
|
|
|
Returns: |
|
Dictionary with detected ticker and confidence |
|
""" |
|
try: |
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {GROQ_API_KEY}" |
|
} |
|
|
|
|
|
name_data = { |
|
"model": GROQ_MODEL, |
|
"messages": [ |
|
{"role": "system", "content": "You are a financial assistant. Extract only the company or stock name from the query. Respond with ONLY the company name, nothing else."}, |
|
{"role": "user", "content": f"Extract the company name from this query: {query}"} |
|
], |
|
"temperature": 0.1, |
|
"max_tokens": 50 |
|
} |
|
|
|
name_response = requests.post( |
|
"https://api.groq.com/openai/v1/chat/completions", |
|
headers=headers, |
|
json=name_data |
|
) |
|
|
|
if name_response.status_code == 200: |
|
company_name = name_response.json()["choices"][0]["message"]["content"].strip() |
|
print(f"Extracted company name: {company_name}") |
|
|
|
if not company_name or company_name.lower() == "unknown": |
|
return {"ticker": "", "confidence": "none", "method": "none"} |
|
|
|
|
|
API_KEY = os.getenv("API_KEY") |
|
CSE_ID = os.getenv("CSE_ID") |
|
|
|
ticker_search_query = f"{company_name} stock ticker symbol NSE BSE" |
|
|
|
search_url = f"https://www.googleapis.com/customsearch/v1?q={ticker_search_query}&cx={CSE_ID}&key={API_KEY}&num=3" |
|
search_response = requests.get(search_url) |
|
|
|
if search_response.status_code == 200: |
|
search_data = search_response.json() |
|
|
|
texts = [] |
|
for item in search_data.get("items", []): |
|
texts.append(item.get("title", "")) |
|
texts.append(item.get("snippet", "")) |
|
|
|
combined_text = " ".join(texts) |
|
print(f"Search results obtained for ticker lookup") |
|
|
|
|
|
ticker_data = { |
|
"model": GROQ_MODEL, |
|
"messages": [ |
|
{"role": "system", "content": "You are a financial expert. Extract ONLY the stock ticker symbol for the company from the text. Return ONLY the ticker symbol (e.g., 'RELIANCE' for Reliance Industries). If uncertain, respond with UNKNOWN."}, |
|
{"role": "user", "content": f"Extract the stock ticker symbol for {company_name} from this text: {combined_text}"} |
|
], |
|
"temperature": 0.1, |
|
"max_tokens": 10 |
|
} |
|
|
|
ticker_response = requests.post( |
|
"https://api.groq.com/openai/v1/chat/completions", |
|
headers=headers, |
|
json=ticker_data |
|
) |
|
|
|
if ticker_response.status_code == 200: |
|
ticker = ticker_response.json()["choices"][0]["message"]["content"].strip() |
|
print(f"Extracted ticker: {ticker}") |
|
|
|
if ticker and ticker != "UNKNOWN": |
|
return {"ticker": ticker, "confidence": "medium", "method": "company_search"} |
|
|
|
except Exception as e: |
|
print(f"Error in company/ticker detection: {str(e)}") |
|
|
|
|
|
return {"ticker": "", "confidence": "none", "method": "none"} |
|
|
|
@staticmethod |
|
@tool |
|
def fetch_stock_news(query: str, num_results: int = 5) -> List[Dict[str, str]]: |
|
"""Fetch stock market news using Google Custom Search API. |
|
|
|
Args: |
|
query: The search term for stock-related news. |
|
num_results: The number of news articles to fetch (default: 10). |
|
|
|
Returns: |
|
A list of dictionaries containing news title, link, and snippet. |
|
""" |
|
API_KEY = os.getenv("API_KEY") |
|
CSE_ID = os.getenv("CSE_ID") |
|
|
|
|
|
stock_terms = query.split() |
|
stock_name = stock_terms[-1] if len(stock_terms) > 0 else "" |
|
|
|
|
|
expanded_query = f"{query}" |
|
|
|
|
|
site_exclusions = () |
|
|
|
|
|
news_sites = ( |
|
"site:economictimes.indiatimes.com OR " |
|
"site:moneycontrol.com OR " |
|
"site:livemint.com OR " |
|
"site:ndtv.com/business OR " |
|
"site:cnbctv18.com OR " |
|
"site:business-standard.com OR " |
|
"site:financialexpress.com OR " |
|
"site:reuters.com OR " |
|
"site:bloomberg.com OR " |
|
"site:investing.com OR " |
|
"site:marketwatch.com OR " |
|
"site:thehindu.com/business OR " |
|
"site:business-today.in OR " |
|
"site:fool.com OR " |
|
"site:zeebiz.com OR " |
|
"site:screener.in" |
|
) |
|
|
|
|
|
final_query = f"{expanded_query} {site_exclusions} ({news_sites})" |
|
|
|
url = f"https://www.googleapis.com/customsearch/v1?q={final_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}&sort=date" |
|
|
|
print(f"Searching with query: {final_query}") |
|
|
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
data = response.json() |
|
results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} for item in data.get("items", [])] |
|
|
|
if not results: |
|
|
|
basic_query = f"{query} stock market news" |
|
fallback_url = f"https://www.googleapis.com/customsearch/v1?q={basic_query}&cx={CSE_ID}&key={API_KEY}&num={num_results}" |
|
fallback_response = requests.get(fallback_url) |
|
|
|
if fallback_response.status_code == 200: |
|
fallback_data = fallback_response.json() |
|
results = [{"title": item["title"], "link": item["link"], "snippet": item.get("snippet", "No description available")} |
|
for item in fallback_data.get("items", [])] |
|
|
|
return results |
|
|
|
return [] |
|
|
|
@staticmethod |
|
@tool |
|
def extract_web_content(urls: List[str], max_urls: int = 3) -> List[Dict[str, str]]: |
|
"""Extract content from web pages. |
|
|
|
Args: |
|
urls: List of URLs to extract content from. |
|
max_urls: Maximum number of URLs to process (default: 3). |
|
|
|
Returns: |
|
A list of dictionaries containing url, title, and content. |
|
""" |
|
results = [] |
|
processed = 0 |
|
|
|
for url in urls[:max_urls]: |
|
try: |
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
} |
|
response = requests.get(url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
title = soup.title.text.strip() if soup.title else "No title" |
|
|
|
|
|
|
|
for script in soup(["script", "style", "nav", "footer", "header", "aside"]): |
|
script.extract() |
|
|
|
|
|
text = soup.get_text(separator='\n') |
|
lines = [line.strip() for line in text.splitlines() if line.strip()] |
|
content = '\n'.join(lines) |
|
|
|
|
|
if len(content) > 5000: |
|
content = content[:5000] + "..." |
|
|
|
results.append({ |
|
"url": url, |
|
"title": title, |
|
"content": content |
|
}) |
|
|
|
processed += 1 |
|
|
|
|
|
if processed < max_urls: |
|
time.sleep(1) |
|
|
|
except Exception as e: |
|
results.append({ |
|
"url": url, |
|
"title": "Error", |
|
"content": f"Failed to extract content: {str(e)}" |
|
}) |
|
|
|
return results |
|
|
|
@staticmethod |
|
@tool |
|
def fetch_stock_price(ticker: str) -> Optional[float]: |
|
"""Fetch the latest stock price for a given ticker symbol. |
|
|
|
Args: |
|
ticker: The stock ticker symbol (e.g., "AAPL"). |
|
|
|
Returns: |
|
The latest closing stock price. |
|
""" |
|
try: |
|
stock = yf.Ticker(ticker) |
|
return stock.history(period="1d")["Close"].iloc[-1] |
|
except Exception: |
|
return None |
|
|
|
@staticmethod |
|
@tool |
|
def summarize_news(news: List[Dict[str, Any]], extracted_content: List[Dict[str, str]] = None) -> str: |
|
"""Summarize stock-related news snippets and extracted web content using Groq API. |
|
|
|
Args: |
|
news: A list of news snippet dictionaries to summarize. |
|
extracted_content: Optional list of extracted web content. |
|
|
|
Returns: |
|
A summarized text of the news. |
|
""" |
|
if not news: |
|
return "No news available." |
|
|
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {GROQ_API_KEY}" |
|
} |
|
|
|
|
|
summary_text = "News snippets:\n" + "\n".join([item["snippet"] for item in news]) |
|
|
|
if extracted_content and len(extracted_content) > 0: |
|
summary_text += "\n\nDetailed content from articles:\n" |
|
for i, item in enumerate(extracted_content, 1): |
|
|
|
content = item["content"] |
|
total_length = len(content) |
|
|
|
if total_length > 1500: |
|
start = content[:500] |
|
middle_start = total_length // 2 - 250 |
|
middle = content[middle_start:middle_start + 500] |
|
end = content[-500:] |
|
extracted_text = f"{start}\n...[middle content]...\n{middle}\n...[more content]...\n{end}" |
|
else: |
|
extracted_text = content |
|
|
|
summary_text += f"\n--- Article {i}: {item['title']} ---\n{extracted_text}\n" |
|
|
|
|
|
data = { |
|
"model": GROQ_MODEL, |
|
"messages": [ |
|
{"role": "system", "content": "You are a financial news summarizer. Provide a concise summary of the most important information from these financial news articles, focused on stock performance, company updates, market trends, and financial metrics."}, |
|
{"role": "user", "content": f"Summarize this financial news:\n{summary_text}"} |
|
], |
|
"temperature": 0.3, |
|
"max_tokens": 1000 |
|
} |
|
|
|
response = requests.post( |
|
"https://api.groq.com/openai/v1/chat/completions", |
|
headers=headers, |
|
json=data |
|
) |
|
|
|
return response.json()["choices"][0]["message"]["content"] if response.status_code == 200 else "Error generating summary." |
|
|
|
@staticmethod |
|
@tool |
|
def analyze_sentiment(text: str) -> Dict[str, Any]: |
|
"""Perform sentiment analysis on the provided text. |
|
|
|
Args: |
|
text: The text to analyze. |
|
|
|
Returns: |
|
A dictionary containing sentiment label and confidence score. |
|
""" |
|
result = sentiment_analyzer(text)[0] |
|
return {"label": result["label"], "score": result["score"]} |
|
|
|
@staticmethod |
|
@tool |
|
def plot_historical_trends(ticker: str, period: str = "1mo") -> str: |
|
"""Generate a plot for historical stock trends. |
|
|
|
Args: |
|
ticker: The stock ticker symbol. |
|
period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "1mo". |
|
|
|
Returns: |
|
File path to the saved plot image. |
|
""" |
|
try: |
|
stock = yf.Ticker(ticker) |
|
history = stock.history(period=period) |
|
if history.empty: |
|
return f"No historical data available for {ticker}" |
|
|
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Stock Price (₹)") |
|
plt.title(f"Historical Stock Trends for {ticker} ({period})") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"{ticker.replace('.', '_')}_historical_trends.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
return file_path |
|
except Exception as e: |
|
return f"Error fetching historical data: {str(e)}" |
|
|
|
@staticmethod |
|
@tool |
|
def plot_nifty_chart(period: str = "6mo") -> str: |
|
"""Generate a plot for Nifty 50 index. |
|
|
|
Args: |
|
period: The historical period to fetch (e.g., "1mo", "3mo", "1y"). Default is "6mo". |
|
|
|
Returns: |
|
File path to the saved plot image. |
|
""" |
|
try: |
|
nifty = yf.Ticker("^NSEI") |
|
history = nifty.history(period=period) |
|
|
|
if history.empty: |
|
return "No historical data available for Nifty 50" |
|
|
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Index Value") |
|
plt.title(f"Nifty 50 Index - {period}") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"nifty50_chart_{period}.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
return file_path |
|
except Exception as e: |
|
return f"Error creating Nifty chart: {str(e)}" |
|
|
|
|
|
smola_tools = SmolaTools() |
|
|
|
|
|
model = LiteLLMModel( |
|
model_id=f"groq/{GROQ_MODEL}", |
|
api_key=GROQ_API_KEY, |
|
) |
|
|
|
|
|
agent = CodeAgent( |
|
tools=[ |
|
smola_tools.detect_ticker_from_query, |
|
smola_tools.fetch_stock_news, |
|
smola_tools.fetch_stock_price, |
|
smola_tools.summarize_news, |
|
smola_tools.analyze_sentiment, |
|
smola_tools.plot_historical_trends, |
|
smola_tools.plot_nifty_chart, |
|
smola_tools.extract_web_content |
|
], |
|
model=model |
|
) |
|
|
|
def stock_research(query, ticker=None, progress=gr.Progress()): |
|
price_md = gr.Markdown("## Current Price\nDetecting ticker...") |
|
news_md = gr.Markdown("## Search Results\nWaiting...") |
|
summary_md = gr.Markdown("## News Summary\nWaiting...") |
|
sentiment_md = gr.Markdown("## Sentiment Analysis\nWaiting...") |
|
chart = None |
|
|
|
|
|
print(f"Starting analysis with query: '{query}', manually entered ticker: '{ticker}'") |
|
|
|
|
|
progress(0/6, desc="Detecting ticker") |
|
if ticker and ticker.strip(): |
|
|
|
ticker = ticker.strip().upper() |
|
print(f"Using manually entered ticker: {ticker}") |
|
price_md = gr.Markdown(f"## Current Price\nUsing manually entered ticker: {ticker}") |
|
else: |
|
|
|
try: |
|
|
|
print("Attempting to detect ticker using SmolaTools") |
|
ticker_info = smola_tools.detect_ticker_from_query(query) |
|
ticker = ticker_info.get("ticker", "") |
|
confidence = ticker_info.get("confidence", "none") |
|
method = ticker_info.get("method", "none") |
|
|
|
print(f"Ticker detection result: {ticker} (confidence: {confidence}, method: {method})") |
|
|
|
if ticker and confidence != "none": |
|
price_md = gr.Markdown(f"## Current Price\nDetected ticker: {ticker} (confidence: {confidence})") |
|
else: |
|
|
|
ticker = "^NSEI" |
|
print(f"No ticker detected with SmolaTools, using default: {ticker}") |
|
price_md = gr.Markdown(f"## Current Price\nNo specific ticker detected. Using Nifty 50 Index.") |
|
except Exception as e: |
|
print(f"Error in SmolaTools ticker detection: {str(e)}") |
|
price_md = gr.Markdown(f"## Current Price\nError detecting ticker. Using Nifty 50 as default.") |
|
ticker = "^NSEI" |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(1/6, desc="Fetching stock price") |
|
try: |
|
print(f"Fetching price for ticker: {ticker}") |
|
if ticker == "^NSEI": |
|
|
|
print("Getting Nifty 50 price") |
|
nifty = yf.Ticker("^NSEI") |
|
stock_price = nifty.history(period="1d")["Close"].iloc[-1] |
|
price_text = f"## Current Price\nNifty 50 Index: {stock_price:.2f}" |
|
else: |
|
|
|
ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker |
|
print(f"Trying NSE ticker: {ticker_with_suffix}") |
|
stock = yf.Ticker(ticker_with_suffix) |
|
history = stock.history(period="1d") |
|
if not history.empty: |
|
stock_price = history["Close"].iloc[-1] |
|
print(f"Found NSE price: {stock_price}") |
|
else: |
|
stock_price = None |
|
print("No NSE data found") |
|
|
|
if stock_price is None: |
|
|
|
bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" |
|
print(f"Trying BSE ticker: {bse_ticker}") |
|
stock = yf.Ticker(bse_ticker) |
|
history = stock.history(period="1d") |
|
if not history.empty: |
|
stock_price = history["Close"].iloc[-1] |
|
ticker_with_suffix = bse_ticker |
|
print(f"Found BSE price: {stock_price}") |
|
else: |
|
print("No BSE data found") |
|
|
|
if stock_price is not None: |
|
price_text = f"## Current Price for {ticker}\n₹{stock_price:.2f}" |
|
else: |
|
print("Could not find price data, using Nifty as fallback") |
|
|
|
ticker = "^NSEI" |
|
nifty = yf.Ticker("^NSEI") |
|
stock_price = nifty.history(period="1d")["Close"].iloc[-1] |
|
price_text = f"## Current Price\nCould not find data for {ticker}. Showing Nifty 50 Index: {stock_price:.2f}" |
|
|
|
price_md = gr.Markdown(price_text) |
|
except Exception as e: |
|
print(f"Error fetching price: {str(e)}") |
|
price_md = gr.Markdown(f"## Current Price\nError fetching price: {str(e)}") |
|
|
|
ticker = "^NSEI" |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(2/6, desc="Searching for news") |
|
news_articles = [] |
|
try: |
|
print(f"Searching news for query: '{query}'") |
|
|
|
news_articles = smola_tools.fetch_stock_news(query) |
|
|
|
|
|
news_text = f"## Search Results\nFound {len(news_articles)} news articles for: '{query}'\n\n" |
|
for i, article in enumerate(news_articles, 1): |
|
news_text += f"{i}. [{article['title']}]({article['link']})\n" |
|
|
|
news_md = gr.Markdown(news_text) |
|
print(f"Found {len(news_articles)} news articles") |
|
except Exception as e: |
|
print(f"Error fetching news: {str(e)}") |
|
news_md = gr.Markdown(f"## Search Results\nError fetching news: {str(e)}") |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(3/6, desc="Extracting article content") |
|
extracted_content = [] |
|
try: |
|
if news_articles: |
|
print("Extracting web content from news URLs") |
|
|
|
urls = [article["link"] for article in news_articles] |
|
|
|
|
|
if not hasattr(smola_tools, 'extract_web_content'): |
|
print("extract_web_content not available, creating simulated content") |
|
extracted_content = [] |
|
for i, article in enumerate(news_articles[:3]): |
|
extracted_content.append({ |
|
"url": article["link"], |
|
"title": article["title"], |
|
"content": article["snippet"] * 5 |
|
}) |
|
else: |
|
|
|
extracted_content = smola_tools.extract_web_content(urls, max_urls=3) |
|
|
|
|
|
news_text += "\n\n*Extracted detailed content from top articles for deeper analysis*" |
|
news_md = gr.Markdown(news_text) |
|
print(f"Extracted content from {len(extracted_content)} articles") |
|
except Exception as e: |
|
print(f"Error extracting web content: {str(e)}") |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(4/6, desc="Summarizing news") |
|
news_summary = "No news available to summarize." |
|
try: |
|
if news_articles: |
|
print("Summarizing news") |
|
|
|
simple_summary = "This is a summary of recent market news: " |
|
for article in news_articles[:3]: |
|
simple_summary += article["snippet"] + " " |
|
|
|
try: |
|
|
|
news_summary = smola_tools.summarize_news(news_articles, extracted_content) |
|
print("News summary created using API") |
|
except Exception as e: |
|
print(f"Error using summarize_news API: {str(e)}, using simple summary") |
|
news_summary = simple_summary |
|
|
|
summary_md = gr.Markdown(f"## News Summary\n{news_summary}") |
|
else: |
|
summary_md = gr.Markdown("## News Summary\nNo news available to summarize.") |
|
except Exception as e: |
|
print(f"Error creating summary: {str(e)}") |
|
summary_md = gr.Markdown(f"## News Summary\nError creating summary: {str(e)}") |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(5/6, desc="Analyzing sentiment") |
|
sentiment = {"label": "UNKNOWN", "score": 0.0} |
|
try: |
|
if news_summary != "No news available to summarize.": |
|
print("Analyzing sentiment") |
|
try: |
|
sentiment = smola_tools.analyze_sentiment(news_summary) |
|
print(f"Sentiment: {sentiment['label']} (score: {sentiment['score']})") |
|
except Exception as e: |
|
print(f"Error in sentiment analysis API: {str(e)}") |
|
|
|
if "growth" in news_summary.lower() or "positive" in news_summary.lower(): |
|
sentiment = {"label": "POSITIVE", "score": 0.75} |
|
elif "decline" in news_summary.lower() or "negative" in news_summary.lower(): |
|
sentiment = {"label": "NEGATIVE", "score": 0.75} |
|
else: |
|
sentiment = {"label": "NEUTRAL", "score": 0.5} |
|
|
|
sentiment_md = gr.Markdown(f"## Sentiment Analysis\n**{sentiment['label']}** (confidence: {sentiment['score']:.2f})") |
|
else: |
|
sentiment_md = gr.Markdown("## Sentiment Analysis\nNo content available for analysis") |
|
except Exception as e: |
|
print(f"Error in sentiment analysis: {str(e)}") |
|
sentiment_md = gr.Markdown(f"## Sentiment Analysis\nError in analysis: {str(e)}") |
|
|
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
progress(6/6, desc="Creating chart") |
|
try: |
|
print(f"Creating chart for {ticker}") |
|
if ticker == "^NSEI": |
|
|
|
print("Creating Nifty chart directly") |
|
nifty = yf.Ticker("^NSEI") |
|
history = nifty.history(period="6mo") |
|
|
|
if not history.empty: |
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Index Value") |
|
plt.title(f"Nifty 50 Index - 6mo") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"nifty50_chart.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
chart = file_path |
|
print(f"Nifty chart saved to {file_path}") |
|
else: |
|
print("No history data for Nifty") |
|
else: |
|
|
|
ticker_with_suffix = f"{ticker}.NS" if not ticker.endswith(('.NS', '.BO')) else ticker |
|
stock = yf.Ticker(ticker_with_suffix) |
|
history = stock.history(period="6mo") |
|
|
|
if not history.empty: |
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Stock Price (₹)") |
|
plt.title(f"Historical Stock Trends for {ticker} (6mo)") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"{ticker.replace('.', '_')}_chart.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
chart = file_path |
|
print(f"Stock chart saved to {file_path}") |
|
else: |
|
|
|
print("No NSE data, trying BSE") |
|
bse_ticker = ticker_with_suffix.replace(".NS", ".BO") if ".NS" in ticker_with_suffix else f"{ticker}.BO" |
|
stock = yf.Ticker(bse_ticker) |
|
history = stock.history(period="6mo") |
|
|
|
if not history.empty: |
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Stock Price (₹)") |
|
plt.title(f"Historical Stock Trends for {ticker} (6mo)") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"{ticker.replace('.', '_')}_chart.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
chart = file_path |
|
print(f"BSE Stock chart saved to {file_path}") |
|
else: |
|
|
|
print("No BSE data either, falling back to Nifty") |
|
nifty = yf.Ticker("^NSEI") |
|
history = nifty.history(period="6mo") |
|
|
|
if not history.empty: |
|
plt.figure(figsize=(10, 5)) |
|
plt.plot(history.index, history["Close"], label="Close Price", marker='o') |
|
plt.xlabel("Date") |
|
plt.ylabel("Index Value") |
|
plt.title(f"Nifty 50 Index - 6mo (Fallback)") |
|
plt.legend() |
|
plt.grid() |
|
file_path = f"nifty50_fallback_chart.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
chart = file_path |
|
print(f"Fallback Nifty chart saved to {file_path}") |
|
except Exception as e: |
|
print(f"Error creating chart: {str(e)}") |
|
|
|
try: |
|
print("Attempting simple Nifty chart as final fallback") |
|
nifty = yf.Ticker("^NSEI") |
|
history = nifty.history(period="1mo") |
|
plt.figure(figsize=(8, 4)) |
|
plt.plot(history.index, history["Close"]) |
|
plt.title("Nifty 50 - Emergency Fallback") |
|
plt.grid(True) |
|
file_path = "emergency_nifty_chart.png" |
|
plt.savefig(file_path) |
|
plt.close() |
|
chart = file_path |
|
print(f"Emergency fallback chart saved") |
|
except: |
|
print("Even emergency fallback failed") |
|
|
|
|
|
progress(1.0) |
|
yield price_md, news_md, summary_md, sentiment_md, chart |
|
|
|
|
|
iface = gr.Interface( |
|
fn=stock_research, |
|
inputs=[ |
|
gr.Textbox(label="Search Query", placeholder="Enter search query like 'latest news on HDFC Bank'"), |
|
gr.Textbox(label="Stock Ticker (Optional)", placeholder="e.g., HDFCBANK, RELIANCE, TCS (leave empty for automatic detection)") |
|
], |
|
outputs=[ |
|
gr.Markdown(label="Current Price"), |
|
gr.Markdown(label="Search Results"), |
|
gr.Markdown(label="News Summary"), |
|
gr.Markdown(label="Sentiment Analysis"), |
|
gr.Image(label="Stock Chart", type="filepath") |
|
], |
|
title="Indian Stock Market Research Tool", |
|
description="Enter a stock-related search term. The system will automatically detect the ticker symbol or use Nifty 50 if no ticker is found. You can also manually specify a ticker if desired." |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
iface.queue().launch() |