Spaces:
Sleeping
Sleeping
########################################################################## | |
# app.py - Pennwick PDF Chat | |
# | |
# HuggingFace Spaces application to anlayze uploaded PDF files | |
# with open-source models ( hkunlp/instructor-xl ) | |
# | |
# Mike Pastor February 17, 2024 | |
import streamlit as st | |
from streamlit.components.v1 import html | |
from dotenv import load_dotenv | |
from PyPDF2 import PdfReader | |
from PIL import Image | |
# Local file | |
from htmlTemplates import css, bot_template, user_template | |
# from langchain.embeddings import HuggingFaceInstructEmbeddings | |
from langchain_community.embeddings import HuggingFaceInstructEmbeddings | |
# from langchain.vectorstores import FAISS | |
from langchain_community.vectorstores import FAISS | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
# from langchain.llms import HuggingFaceHub | |
from langchain_community.llms import HuggingFaceHub | |
################################################################################## | |
# Admin flags | |
DISPLAY_DIALOG_LINES=6 | |
SESSION_STARTED = False | |
################################################################################## | |
def extract_pdf_text(pdf_docs): | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
################################################################################## | |
# Chunk size and overlap must not exceed the models capacity! | |
# | |
def extract_bitesize_pieces(text): | |
text_splitter = CharacterTextSplitter( | |
separator="\n", | |
chunk_size=800, # 1000 | |
chunk_overlap=200, | |
length_function=len | |
) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
################################################################################## | |
def prepare_embedding_vectors(text_chunks): | |
st.write('Here in vector store....', unsafe_allow_html=True) | |
# embeddings = OpenAIEmbeddings() | |
# pip install InstructorEmbedding | |
# pip install sentence-transformers==2.2.2 | |
embeddings = HuggingFaceInstructEmbeddings(model_name="hkunlp/instructor-xl") | |
st.write('Here in vector store - got embeddings ', unsafe_allow_html=True) | |
# from InstructorEmbedding import INSTRUCTOR | |
# model = INSTRUCTOR('hkunlp/instructor-xl') | |
# sentence = "3D ActionSLAM: wearable person tracking in multi-floor environments" | |
# instruction = "Represent the Science title:" | |
# embeddings = model.encode([[instruction, sentence]]) | |
# embeddings = model.encode(text_chunks) | |
print('have Embeddings: ') | |
# text_chunks="this is a test" | |
# FAISS, Chroma and other vector databases | |
# | |
vectorstore = FAISS.from_texts(texts=text_chunks, embedding=embeddings) | |
st.write('FAISS succeeds: ') | |
return vectorstore | |
################################################################################## | |
def prepare_conversation(vectorstore): | |
# llm = ChatOpenAI() | |
# llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature":0.5, "max_length":512}) | |
# google/bigbird-roberta-base facebook/bart-large | |
llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature": 0.7, "max_length": 512}) | |
memory = ConversationBufferMemory( | |
memory_key='chat_history', return_messages=True) | |
conversation_chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever=vectorstore.as_retriever(), | |
memory=memory, | |
) | |
return conversation_chain | |
################################################################################## | |
def process_user_question(user_question): | |
print('process_user_question called: \n') | |
# if not SESSION_STARTED: | |
# print('No Session') | |
# st.write( 'Please upload and analyze your PDF files first!') | |
# return | |
if user_question == None : | |
print('question is null') | |
return | |
if user_question == '' : | |
print('question is blank') | |
return | |
if st == None : | |
print('session is null') | |
return | |
if st.session_state == None : | |
print('session STATE is null') | |
return | |
print('question is: ', user_question) | |
print('\nsession is: ', st ) | |
# try: | |
# response = st.session_state.conversation({'question': user_question}) | |
# # response = st.session_state.conversation({'summarization': user_question}) | |
# st.session_state.chat_history = response['chat_history'] | |
# Exception: | |
# st.write( 'Please upload and analyze your PDF files first!') | |
# return | |
# st.empty() | |
# try: | |
# st.session_state.conversation({'question': "Summarize the document"}) | |
# # if "key" not in st.session_state: | |
# # st.write('Good') | |
# except: | |
# st.error("Please upload and analyze your PDF files first!") | |
# return | |
if st.session_state.conversation == None: | |
st.error("Please upload and analyze your PDF files first!") | |
return | |
response = st.session_state.conversation({'question': user_question}) | |
st.session_state.chat_history = response['chat_history'] | |
results_size = len(response['chat_history']) | |
results_string = "" | |
print('results_size is: ', results_size ) | |
for i, message in enumerate(st.session_state.chat_history): | |
# Scrolling does not display the last printed line, | |
# so only print the last 6 lines | |
# | |
print('results_size on msg: ', results_size, i, ( results_size - DISPLAY_DIALOG_LINES ) ) | |
if results_size > DISPLAY_DIALOG_LINES: | |
if i < ( results_size - DISPLAY_DIALOG_LINES ): | |
continue | |
if i % 2 == 0: | |
# st.write(user_template.replace( | |
# "{{MSG}}", message.content), unsafe_allow_html=True) | |
results_string += ( "<p>" + message.content + "</p>" ) | |
else: | |
# st.write(bot_template.replace( | |
# "{{MSG}}", message.content), unsafe_allow_html=True) | |
results_string += ( "<p>" + "-- " + message.content + "</p>" ) | |
html(results_string, height=300, scrolling=True) | |
################################################################################### | |
def main(): | |
print( 'Pennwick Starting up...\n') | |
# Load the environment variables - if any | |
load_dotenv() | |
################################################################################## | |
# st.set_page_config(page_title="Pennwick PDF Analyzer", page_icon=":books:") | |
# im = Image.open("robot_icon.ico") | |
# st.set_page_config(page_title="Pennwick PDF Analyzer", page_icon=im ) | |
# st.set_page_config(page_title="Pennwick PDF Analyzer") | |
# import base64 | |
# from PIL import Image | |
# # Open your image | |
# image = Image.open("robot_icon.ico") | |
# # Convert image to base64 string | |
# with open("robot_icon.ico", "rb") as f: | |
# encoded_string = base64.b64encode(f.read()).decode() | |
# # Set page config with base64 string | |
# st.set_page_config(page_title="Pennwick File Analyzer 2", page_icon=f"data:image/ico;base64,{encoded_string}") | |
st.set_page_config(page_title="Pennwick File Analyzer", page_icon="./robot_icon.ico") | |
print( 'prepared page...\n') | |
################### | |
st.write(css, unsafe_allow_html=True) | |
if "conversation" not in st.session_state: | |
st.session_state.conversation = None | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = None | |
# st.header("Pennwick File Analyzer :shark:") | |
# st.header("Pennwick File Analyzer 2") | |
st.image("robot_icon.png", width=96 ) | |
st.header(f"Pennwick File Analyzer") | |
user_question = None | |
user_question = st.text_input("Ask the Open Source - Flan-T5 Model a question about your uploaded documents:") | |
if user_question != None: | |
print( 'calling process question', user_question) | |
process_user_question(user_question) | |
# st.write( user_template, unsafe_allow_html=True) | |
# st.write(user_template.replace( "{{MSG}}", "Hello robot!"), unsafe_allow_html=True) | |
# st.write(bot_template.replace( "{{MSG}}", "Hello human!"), unsafe_allow_html=True) | |
with st.sidebar: | |
st.subheader("Which documents would you like to analyze?") | |
st.subheader("(no data is saved beyond the session)") | |
pdf_docs = st.file_uploader( | |
"Upload your PDF documents here and click on 'Analyze'", accept_multiple_files=True) | |
# Upon button press | |
if st.button("Analyze these files"): | |
with st.spinner("Processing..."): | |
################################################################# | |
# Track the overall time for file processing into Vectors | |
# # | |
from datetime import datetime | |
global_now = datetime.now() | |
global_current_time = global_now.strftime("%H:%M:%S") | |
st.write("Vectorizing Files - Current Time =", global_current_time) | |
# get pdf text | |
raw_text = extract_pdf_text(pdf_docs) | |
# st.write(raw_text) | |
# # get the text chunks | |
text_chunks = extract_bitesize_pieces(raw_text) | |
# st.write(text_chunks) | |
# # create vector store | |
vectorstore = prepare_embedding_vectors(text_chunks) | |
# # create conversation chain | |
st.session_state.conversation = prepare_conversation(vectorstore) | |
SESSION_STARTED = True | |
# Mission Complete! | |
global_later = datetime.now() | |
st.write("Files Vectorized - Total EXECUTION Time =", | |
(global_later - global_now), global_later) | |
if __name__ == '__main__': | |
main() | |