Spaces:
Running
Running
import openai | |
import os | |
import pdfplumber | |
from langchain.chains.mapreduce import MapReduceChain | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain.prompts import PromptTemplate | |
import logging | |
import json | |
from typing import List | |
import mimetypes | |
import validators | |
import requests | |
import tempfile | |
from bs4 import BeautifulSoup | |
from langchain.chains import create_extraction_chain | |
from GoogleNews import GoogleNews | |
import pandas as pd | |
import gradio as gr | |
import re | |
from langchain.document_loaders import WebBaseLoader | |
from langchain.chains.llm import LLMChain | |
from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
from transformers import pipeline | |
import plotly.express as px | |
class KeyValueExtractor: | |
def __init__(self): | |
""" | |
Initialize the ContractSummarizer object. | |
Parameters: | |
pdf_file_path (str): The path to the input PDF file. | |
""" | |
self.model = "facebook/bart-large-mnli" | |
def get_news(self,keyword): | |
googlenews = GoogleNews(lang='en', region='US', period='1d', encode='utf-8') | |
googlenews.clear() | |
googlenews.search(keyword) | |
googlenews.get_page(2) | |
news_result = googlenews.result(sort=True) | |
news_data_df = pd.DataFrame.from_dict(news_result) | |
news_data_df.info() | |
# Display header of dataframe. | |
news_data_df.head() | |
tot_news_link = [] | |
for index, headers in news_data_df.iterrows(): | |
news_link = str(headers['link']) | |
tot_news_link.append(news_link) | |
return tot_news_link | |
def url_format(self,urls): | |
tot_url_links = [] | |
for url_text in urls: | |
# Define a regex pattern to match URLs starting with 'http' or 'https' | |
pattern = r'(https?://[^\s]+)' | |
# Search for the URL in the text using the regex pattern | |
match = re.search(pattern, url_text) | |
if match: | |
extracted_url = match.group(1) | |
tot_url_links.append(extracted_url) | |
else: | |
print("No URL found in the given text.") | |
return tot_url_links | |
def clear_error_ulr(self,urls): | |
error_url = [] | |
for url in urls: | |
if validators.url(url): | |
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
r = requests.get(url,headers=headers) | |
if r.status_code != 200: | |
# raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
print(f"Error fetching {url}:") | |
error_url.append(url) | |
continue | |
cleaned_list_url = [item for item in urls if item not in error_url] | |
return cleaned_list_url | |
def get_each_link_summary(self,urls): | |
each_link_summary = "" | |
for url in urls: | |
loader = WebBaseLoader(url) | |
docs = loader.load() | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Prepare the prompt template for summarization | |
prompt_template = """Write a concise summary of the following: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
print(result["output_text"]) | |
# Return the refined summary | |
each_link_summary = each_link_summary + result["output_text"] | |
return each_link_summary | |
def save_text_to_file(self,each_link_summary) -> str: | |
""" | |
Load the text from the saved file and split it into documents. | |
Returns: | |
List[str]: List of document texts. | |
""" | |
# Get the path to the text file where the extracted text will be saved | |
file_path = "extracted_text.txt" | |
try: | |
with open(file_path, 'w') as file: | |
# Write the extracted text into the text file | |
file.write(each_link_summary) | |
# Return the file path of the saved text file | |
return file_path | |
except IOError as e: | |
# If an IOError occurs during the file saving process, log the error | |
logging.error(f"Error while saving text to file: {e}") | |
def document_loader(self,file_path) -> List[str]: | |
""" | |
Load the text from the saved file and split it into documents. | |
Returns: | |
List[str]: List of document texts. | |
""" | |
# Initialize the UnstructuredFileLoader | |
loader = UnstructuredFileLoader(file_path, strategy="fast") | |
# Load the documents from the file | |
docs = loader.load() | |
# Return the list of loaded document texts | |
return docs | |
def document_text_spilliter(self,docs) -> List[str]: | |
""" | |
Split documents into chunks for efficient processing. | |
Returns: | |
List[str]: List of split document chunks. | |
""" | |
# Initialize the text splitter with specified chunk size and overlap | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Return the list of split document chunks | |
return split_docs | |
def extract_key_value_pair(self,content) -> None: | |
""" | |
Extract key-value pairs from the refined summary. | |
Prints the extracted key-value pairs. | |
""" | |
try: | |
# Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
response = openai.Completion.create( | |
engine="text-davinci-003", # You can choose a different engine as well | |
temperature = 0, | |
prompt=f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```.", | |
max_tokens=1000 # You can adjust the length of the response | |
) | |
# Extract and return the chatbot's reply | |
result = response['choices'][0]['text'].strip() | |
return result | |
except Exception as e: | |
# If an error occurs during the key-value extraction process, log the error | |
logging.error(f"Error while extracting key-value pairs: {e}") | |
print("Error:", e) | |
def refine_summary(self,split_docs) -> str: | |
""" | |
Refine the summary using the provided context. | |
Returns: | |
str: Refined summary. | |
""" | |
# Prepare the prompt template for summarization | |
prompt_template = """Write a detalied broad abractive summary of the following: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
key_value_pair = self.extract_key_value_pair(result["output_text"]) | |
# Return the refined summary | |
return result["output_text"],key_value_pair | |
def analyze_sentiment_for_graph(self, text): | |
pipe = pipeline("zero-shot-classification", model=self.model) | |
label=["Positive", "Negative", "Neutral"] | |
result = pipe(text, label) | |
sentiment_scores = { | |
result['labels'][0]: result['scores'][0], | |
result['labels'][1]: result['scores'][1], | |
result['labels'][2]: result['scores'][2] | |
} | |
return sentiment_scores | |
def display_graph(self,text): | |
sentiment_scores = self.analyze_sentiment_for_graph(text) | |
labels = sentiment_scores.keys() | |
scores = sentiment_scores.values() | |
fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
fig.update_traces(texttemplate='%{x:.2f}%', textposition='outside') | |
fig.update_layout(title="Sentiment Analysis",width=800) | |
formatted_pairs = [] | |
for key, value in sentiment_scores.items(): | |
formatted_value = round(value, 2) # Round the value to two decimal places | |
formatted_pairs.append(f"{key} : {formatted_value}") | |
result_string = '\t'.join(formatted_pairs) | |
return fig | |
def main(self,keyword): | |
urls = self.get_news(keyword) | |
tot_urls = self.url_format(urls) | |
clean_url = self.clear_error_ulr(tot_urls) | |
each_link_summary = self.get_each_link_summary(clean_url) | |
file_path = self.save_text_to_file(each_link_summary) | |
docs = self.document_loader(file_path) | |
split_docs = self.document_text_spilliter(docs) | |
result = self.refine_summary(split_docs) | |
return result | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme= 'karthikeyan-adople/hudsonhayes-gray') as app: | |
gr.HTML("""<center class="darkblue" style='background-color:rgb(0,1,36); text-align:center;padding:25px;'><center><h1 class ="center"> | |
<img src="file=logo.png" height="110px" width="280px"></h1></center> | |
<br><h1 style="color:#fff">summarizer</h1></center>""") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150, ): | |
input_news = gr.Textbox(label="NEWS") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse = gr.Button("Analyse") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=0.50, min_width=150): | |
result_summary = gr.Textbox(label="Summary") | |
with gr.Column(scale=0.50, min_width=150): | |
key_value_pair_result = gr.Textbox(label="Key Value Pair") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=0.70, min_width=0): | |
plot =gr.Plot(label="Customer", size=(500, 600)) | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse_sentiment = gr.Button("Analyse") | |
analyse.click(self.main, input_news, [result_summary,key_value_pair_result]) | |
analyse_sentiment.click(self.display_graph,result_summary,[plot]) | |
app.launch(debug=True) | |
if __name__ == "__main__": | |
text_process = KeyValueExtractor() | |
text_process.gradio_interface() |