Spaces:
Build error
Build error
import re | |
import openai | |
import pandas as pd | |
import pinecone | |
import spacy | |
import streamlit_scrollable_textbox as stx | |
import torch | |
from sentence_transformers import SentenceTransformer | |
from tqdm import tqdm | |
from transformers import ( | |
AutoModelForMaskedLM, | |
AutoModelForSeq2SeqLM, | |
AutoTokenizer, | |
pipeline, | |
) | |
import streamlit as st | |
def get_data(): | |
data = pd.read_csv("earnings_calls_cleaned_metadata.csv") | |
return data | |
# Initialize Spacy Model | |
def get_spacy_model(): | |
return spacy.load("en_core_web_sm") | |
# Initialize models from HuggingFace | |
def get_t5_model(): | |
return pipeline("summarization", model="t5-small", tokenizer="t5-small") | |
def get_flan_t5_model(): | |
return pipeline( | |
"summarization", | |
model="google/flan-t5-base", | |
tokenizer="google/flan-t5-base", | |
max_length=512, | |
# length_penalty = 0 | |
) | |
def get_mpnet_embedding_model(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = SentenceTransformer( | |
"sentence-transformers/all-mpnet-base-v2", device=device | |
) | |
model.max_seq_length = 512 | |
return model | |
def get_splade_sparse_embedding_model(): | |
model_sparse = "naver/splade-cocondenser-ensembledistil" | |
# check device | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
tokenizer = AutoTokenizer.from_pretrained(model_sparse) | |
model_sparse = AutoModelForMaskedLM.from_pretrained(model_sparse) | |
# move to gpu if available | |
model_sparse.to(device) | |
return model_sparse, tokenizer | |
def get_sgpt_embedding_model(): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = SentenceTransformer( | |
"Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device | |
) | |
model.max_seq_length = 512 | |
return model | |
def save_key(api_key): | |
return api_key | |
def create_dense_embeddings(query, model): | |
dense_emb = model.encode([query]).tolist() | |
return dense_emb | |
def create_sparse_embeddings(query, model, tokenizer): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
inputs = tokenizer(query, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
logits = model(**inputs).logits | |
inter = torch.log1p(torch.relu(logits[0])) | |
token_max = torch.max(inter, dim=0) # sum over input tokens | |
nz_tokens = torch.where(token_max.values > 0)[0] | |
nz_weights = token_max.values[nz_tokens] | |
order = torch.sort(nz_weights, descending=True) | |
nz_weights = nz_weights[order[1]] | |
nz_tokens = nz_tokens[order[1]] | |
return { | |
"indices": nz_tokens.cpu().numpy().tolist(), | |
"values": nz_weights.cpu().numpy().tolist(), | |
} | |
def hybrid_score_norm(dense, sparse, alpha: float): | |
"""Hybrid score using a convex combination | |
alpha * dense + (1 - alpha) * sparse | |
Args: | |
dense: Array of floats representing | |
sparse: a dict of `indices` and `values` | |
alpha: scale between 0 and 1 | |
""" | |
if alpha < 0 or alpha > 1: | |
raise ValueError("Alpha must be between 0 and 1") | |
hs = { | |
"indices": sparse["indices"], | |
"values": [v * (1 - alpha) for v in sparse["values"]], | |
} | |
return [v * alpha for v in dense], hs | |
def query_pinecone_sparse( | |
dense_vec, | |
sparse_vec, | |
top_k, | |
index, | |
year, | |
quarter, | |
ticker, | |
participant_type, | |
threshold=0.25, | |
): | |
if participant_type == "Company Speaker": | |
participant = "Answer" | |
else: | |
participant = "Question" | |
if year == "All": | |
if quarter == "All": | |
xc = index.query( | |
vector=dense_vec, | |
sparse_vector=sparse_vec, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
xc = index.query( | |
vector=dense_vec, | |
sparse_vector=sparse_vec, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
# search pinecone index for context passage with the answer | |
xc = index.query( | |
vector=dense_vec, | |
sparse_vector=sparse_vec, | |
top_k=top_k, | |
filter={ | |
"Year": int(year), | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
# filter the context passages based on the score threshold | |
filtered_matches = [] | |
for match in xc["matches"]: | |
if match["score"] >= threshold: | |
filtered_matches.append(match) | |
xc["matches"] = filtered_matches | |
return xc | |
def query_pinecone( | |
dense_vec, | |
top_k, | |
index, | |
year, | |
quarter, | |
ticker, | |
participant_type, | |
threshold=0.25, | |
): | |
if participant_type == "Company Speaker": | |
participant = "Answer" | |
else: | |
participant = "Question" | |
if year == "All": | |
if quarter == "All": | |
xc = index.query( | |
vector=dense_vec, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
xc = index.query( | |
vector=dense_vec, | |
top_k=top_k, | |
filter={ | |
"Year": { | |
"$in": [ | |
int("2020"), | |
int("2019"), | |
int("2018"), | |
int("2017"), | |
int("2016"), | |
] | |
}, | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
else: | |
# search pinecone index for context passage with the answer | |
xc = index.query( | |
vector=dense_vec, | |
top_k=top_k, | |
filter={ | |
"Year": int(year), | |
"Quarter": {"$eq": quarter}, | |
"Ticker": {"$eq": ticker}, | |
"QA_Flag": {"$eq": participant}, | |
}, | |
include_metadata=True, | |
) | |
# filter the context passages based on the score threshold | |
filtered_matches = [] | |
for match in xc["matches"]: | |
if match["score"] >= threshold: | |
filtered_matches.append(match) | |
xc["matches"] = filtered_matches | |
return xc | |
def format_query(query_results): | |
# extract passage_text from Pinecone search result | |
context = [ | |
result["metadata"]["Text"] for result in query_results["matches"] | |
] | |
return context | |
def sentence_id_combine(data, query_results, lag=1): | |
# Extract sentence IDs from query results | |
ids = [ | |
result["metadata"]["Sentence_id"] | |
for result in query_results["matches"] | |
] | |
# Generate new IDs by adding a lag value to the original IDs | |
new_ids = [id + i for id in ids for i in range(-lag, lag + 1)] | |
# Remove duplicates and sort the new IDs | |
new_ids = sorted(set(new_ids)) | |
# Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1 | |
lookup_ids = [ | |
new_ids[i : i + (lag * 2 + 1)] | |
for i in range(0, len(new_ids), lag * 2 + 1) | |
] | |
# Create a list of context sentences by joining the sentences corresponding to the lookup IDs | |
context_list = [ | |
" ".join( | |
data.loc[data["Sentence_id"].isin(lookup_id), "Text"].to_list() | |
) | |
for lookup_id in lookup_ids | |
] | |
return context_list | |
def text_lookup(data, sentence_ids): | |
context = ". ".join(data.iloc[sentence_ids].to_list()) | |
return context | |
def generate_gpt_prompt(query_text, context_list): | |
context = " ".join(context_list) | |
prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible. | |
Context: {context} | |
Question: {query_text} | |
Answer:""" | |
return prompt | |
def generate_gpt_prompt_2(query_text, context_list): | |
context = " ".join(context_list) | |
prompt = f""" | |
Context information is below: | |
--------------------- | |
{context} | |
--------------------- | |
Given the context information and prior knowledge, answer this question: | |
{query_text} | |
Try to include as many key details as possible and format the answer in points.""" | |
return prompt | |
def generate_flant5_prompt(query_text, context_list): | |
context = " \n".join(context_list) | |
prompt = f"""Given the context information and prior knowledge, answer this question: | |
{query_text} | |
Context information is below: | |
--------------------- | |
{context} | |
---------------------""" | |
return prompt | |
def get_context_list_prompt(prompt): | |
prompt_list = prompt.split("---------------------") | |
context = prompt_list[-2].strip() | |
context_list = context.split(" \n") | |
return context_list | |
def gpt_model(prompt): | |
response = openai.Completion.create( | |
model="text-davinci-003", | |
prompt=prompt, | |
temperature=0.1, | |
max_tokens=1024, | |
top_p=1.0, | |
frequency_penalty=0.5, | |
presence_penalty=1, | |
) | |
return response.choices[0].text | |
# Entity Extraction | |
def extract_quarter_year(string): | |
# Extract year from string | |
year_match = re.search(r"\d{4}", string) | |
if year_match: | |
year = year_match.group() | |
else: | |
return None, None | |
# Extract quarter from string | |
quarter_match = re.search(r"Q\d", string) | |
if quarter_match: | |
quarter = "Q" + quarter_match.group()[1] | |
else: | |
return None, None | |
return quarter, year | |
def extract_entities(query, model): | |
doc = model(query) | |
entities = {ent.label_: ent.text for ent in doc.ents} | |
if "ORG" in entities.keys(): | |
company = entities["ORG"].lower() | |
if "DATE" in entities.keys(): | |
quarter, year = extract_quarter_year(entities["DATE"]) | |
return company, quarter, year | |
else: | |
return company, None, None | |
else: | |
if "DATE" in entities.keys(): | |
quarter, year = extract_quarter_year(entities["DATE"]) | |
return None, quarter, year | |
else: | |
return None, None, None | |
def clean_entities(company, quarter, year): | |
company_ticker_map = { | |
"apple": "AAPL", | |
"amd": "AMD", | |
"amazon": "AMZN", | |
"cisco": "CSCO", | |
"google": "GOOGL", | |
"microsoft": "MSFT", | |
"nvidia": "NVDA", | |
"asml": "ASML", | |
"intel": "INTC", | |
"micron": "MU", | |
} | |
ticker_choice = [ | |
"AAPL", | |
"CSCO", | |
"MSFT", | |
"ASML", | |
"NVDA", | |
"GOOGL", | |
"MU", | |
"INTC", | |
"AMZN", | |
"AMD", | |
] | |
year_choice = ["2020", "2019", "2018", "2017", "2016", "All"] | |
quarter_choice = ["Q1", "Q2", "Q3", "Q4", "All"] | |
if company is not None: | |
if company in company_ticker_map.keys(): | |
ticker = company_ticker_map[company] | |
ticker_index = ticker_choice.index(ticker) | |
else: | |
ticker_index = 0 | |
else: | |
ticker_index = 0 | |
if quarter is not None: | |
if quarter in quarter_choice: | |
quarter_index = quarter_choice.index(quarter) | |
else: | |
quarter_index = len(quarter_choice) - 1 | |
else: | |
quarter_index = len(quarter_choice) - 1 | |
if year is not None: | |
if year in year_choice: | |
year_index = year_choice.index(year) | |
else: | |
year_index = len(year_choice) - 1 | |
else: | |
year_index = len(year_choice) - 1 | |
return ticker_index, quarter_index, year_index | |
# Transcript Retrieval | |
def retrieve_transcript(data, year, quarter, ticker): | |
if year == "All" or quarter == "All": | |
row = ( | |
data.loc[ | |
(data.Ticker == ticker), | |
["File_Name"], | |
] | |
.drop_duplicates() | |
.iloc[0, 0] | |
) | |
else: | |
row = ( | |
data.loc[ | |
(data.Year == int(year)) | |
& (data.Quarter == quarter) | |
& (data.Ticker == ticker), | |
["File_Name"], | |
] | |
.drop_duplicates() | |
.iloc[0, 0] | |
) | |
# convert row to a string and join values with "-" | |
# row_str = "-".join(row.astype(str)) + ".txt" | |
open_file = open( | |
f"Transcripts/{ticker}/{row}", | |
"r", | |
) | |
file_text = open_file.read() | |
return file_text | |