import os import streamlit as st from st_aggrid import AgGrid import pandas as pd from transformers import pipeline, T5ForConditionalGeneration, T5Tokenizer # Set the page layout for Streamlit st.set_page_config(layout="wide") # CSS styling style = ''' ''' st.markdown(style, unsafe_allow_html=True) st.markdown('

HertogAI Table Q&A using TAPAS and Model Language

', unsafe_allow_html=True) st.markdown('

This code is based on Jordan Skinner. I enhanced his work using Language Model T5

', unsafe_allow_html=True) st.markdown("

Pre-trained TAPAS model runs on max 64 rows and 32 columns data. Make sure the file data doesn't exceed these dimensions.

", unsafe_allow_html=True) # Initialize TAPAS pipeline tqa = pipeline(task="table-question-answering", model="google/tapas-large-finetuned-wtq", device="cpu") # Initialize T5 tokenizer and model for text generation t5_tokenizer = T5Tokenizer.from_pretrained("t5-small") t5_model = T5ForConditionalGeneration.from_pretrained("t5-small") # File uploader in the sidebar file_name = st.sidebar.file_uploader("Upload file:", type=['csv', 'xlsx']) # File processing and question answering if file_name is None: st.markdown('

Please click left side bar to upload an excel or csv file

', unsafe_allow_html=True) else: try: # Check file type and handle reading accordingly if file_name.name.endswith('.csv'): df = pd.read_csv(file_name, sep=';', encoding='ISO-8859-1') # Adjust encoding if needed elif file_name.name.endswith('.xlsx'): df = pd.read_excel(file_name, engine='openpyxl') # Use openpyxl to read .xlsx files else: st.error("Unsupported file type") df = None # Continue with further processing if df is loaded if df is not None: numeric_columns = df.select_dtypes(include=['object']).columns for col in numeric_columns: df[col] = pd.to_numeric(df[col], errors='ignore') st.write("Original Data:") st.write(df) # Create a copy for numerical operations df_numeric = df.copy() df = df.astype(str) # Display the first 5 rows of the dataframe in an editable grid grid_response = AgGrid( df.head(5), columns_auto_size_mode='FIT_CONTENTS', editable=True, height=300, width='100%', ) except Exception as e: st.error(f"Error reading file: {str(e)}") # User input for the question question = st.text_input('Type your question') # Process the answer using TAPAS and T5 with st.spinner(): if st.button('Answer'): try: # Get the raw answer from TAPAS raw_answer = tqa(table=df, query=question, truncation=True) st.markdown("

Raw Result From TAPAS:

", unsafe_allow_html=True) st.success(raw_answer) # Extract relevant information from the TAPAS result answer = raw_answer['answer'] aggregator = raw_answer.get('aggregator', '') coordinates = raw_answer.get('coordinates', []) cells = raw_answer.get('cells', []) # Construct a base sentence replacing 'SUM' with the query term base_sentence = f"The {question.lower()} of the selected data is {answer}." if coordinates and cells: rows_info = [f"Row {coordinate[0] + 1}, Column '{df.columns[coordinate[1]]}' with value {cell}" for coordinate, cell in zip(coordinates, cells)] rows_description = " and ".join(rows_info) base_sentence += f" This includes the following data: {rows_description}." # Generate a fluent response using the T5 model, rephrasing the base sentence input_text = f"Given the question: '{question}', generate a more human-readable response: {base_sentence}" # Tokenize the input and generate a fluent response using T5 inputs = t5_tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True) summary_ids = t5_model.generate(inputs, max_length=150, num_beams=4, early_stopping=True) # Decode the generated text generated_text = t5_tokenizer.decode(summary_ids[0], skip_special_tokens=True) # Display the final generated response st.markdown("

Final Generated Response with LLM:

", unsafe_allow_html=True) st.success(generated_text) except Exception as e: st.warning("Please retype your question and make sure to use the column name and cell value correctly.") try: # Get raw answer again from TAPAS raw_answer = tqa(table=df, query=question, truncation=True) # Display raw result for debugging purposes st.markdown("

Raw Result:

", unsafe_allow_html=True) st.success(raw_answer) # Processing the raw_answer processed_answer = raw_answer['answer'].replace(';', ' ') # Clean the answer text row_idx = raw_answer['coordinates'][0][0] # Row index from TAPAS col_idx = raw_answer['coordinates'][0][1] # Column index from TAPAS column_name = df.columns[col_idx] # Column name from the DataFrame row_data = df.iloc[row_idx].to_dict() # Row data corresponding to the row index # Handle different types of answers (e.g., 'SUM', 'MAX', 'MIN', 'AVG', etc.) if 'SUM' in processed_answer: summary_type = 'sum' numeric_value = df_numeric[column_name].sum() elif 'MAX' in processed_answer: summary_type = 'maximum' numeric_value = df_numeric[column_name].max() elif 'MIN' in processed_answer: summary_type = 'minimum' numeric_value = df_numeric[column_name].min() elif 'AVG' in processed_answer or 'AVERAGE' in processed_answer: summary_type = 'average' numeric_value = df_numeric[column_name].mean() elif 'COUNT' in processed_answer: summary_type = 'count' numeric_value = df_numeric[column_name].count() elif 'MEDIAN' in processed_answer: summary_type = 'median' numeric_value = df_numeric[column_name].median() elif 'STD' in processed_answer or 'STANDARD DEVIATION' in processed_answer: summary_type = 'std_dev' numeric_value = df_numeric[column_name].std() else: summary_type = 'value' numeric_value = processed_answer # In case of a general answer # Build a natural language response based on the aggregation type if summary_type == 'sum': natural_language_answer = f"The total {column_name} is {numeric_value}." elif summary_type == 'maximum': natural_language_answer = f"The highest {column_name} is {numeric_value}, recorded for '{row_data.get('Name', 'Unknown')}'." elif summary_type == 'minimum': natural_language_answer = f"The lowest {column_name} is {numeric_value}, recorded for '{row_data.get('Name', 'Unknown')}'." elif summary_type == 'average': natural_language_answer = f"The average {column_name} is {numeric_value}." elif summary_type == 'count': natural_language_answer = f"The number of entries in {column_name} is {numeric_value}." elif summary_type == 'median': natural_language_answer = f"The median {column_name} is {numeric_value}." elif summary_type == 'std_dev': natural_language_answer = f"The standard deviation of {column_name} is {numeric_value}." else: natural_language_answer = f"The {column_name} value is {numeric_value} for '{row_data.get('Name', 'Unknown')}'." # Display the final natural language answer st.markdown("

Analysis Results:

", unsafe_allow_html=True) st.success(f""" • Answer: {natural_language_answer} Data Location: • Row: {row_idx + 1} • Column: {column_name} Additional Context: • Full Row Data: {row_data} • Query Asked: "{question}" """) except Exception as e: st.warning("Please retype your question and make sure to use the column name and cell value correctly.") # Initialize TAPAS pipeline tqa = pipeline(task="table-question-answering", model="google/tapas-large-finetuned-wtq", device="cpu") # Initialize T5 tokenizer and model for text generation t5_tokenizer = T5Tokenizer.from_pretrained("t5-small") t5_model = T5ForConditionalGeneration.from_pretrained("t5-small") # File uploader in the sidebar file_name = st.sidebar.file_uploader("Upload file:", type=['csv', 'xlsx']) # File processing and question answering if file_name is None: st.markdown('

Please upload an excel or csv file

', unsafe_allow_html=True) else: try: # Check file type and handle reading accordingly if file_name.name.endswith('.csv'): df = pd.read_csv(file_name, sep=';', encoding='ISO-8859-1') # Adjust encoding if needed elif file_name.name.endswith('.xlsx'): df = pd.read_excel(file_name, engine='openpyxl') # Use openpyxl to read .xlsx files else: st.error("Unsupported file type") df = None # Continue with further processing if df is loaded if df is not None: numeric_columns = df.select_dtypes(include=['object']).columns for col in numeric_columns: df[col] = pd.to_numeric(df[col], errors='ignore') st.write("Original Data:") st.write(df) # Create a copy for numerical operations df_numeric = df.copy() df = df.astype(str) # Display the first 5 rows of the dataframe in an editable grid grid_response = AgGrid( df.head(5), columns_auto_size_mode='FIT_CONTENTS', editable=True, height=300, width='100%', ) except Exception as e: st.error(f"Error reading file: {str(e)}") # User input for the question question = st.text_input('Type your question') # Process the answer using TAPAS and T5 with st.spinner(): if st.button('Answer'): try: # Get the raw answer from TAPAS raw_answer = tqa(table=df, query=question, truncation=True) st.markdown("

Raw Result From TAPAS:

", unsafe_allow_html=True) st.success(raw_answer) # Extract relevant information from the TAPAS result answer = raw_answer['answer'] aggregator = raw_answer.get('aggregator', '') coordinates = raw_answer.get('coordinates', []) cells = raw_answer.get('cells', []) # Construct a base sentence replacing 'SUM' with the query term base_sentence = f"The {question.lower()} of the selected data is {answer}." if coordinates and cells: rows_info = [f"Row {coordinate[0] + 1}, Column '{df.columns[coordinate[1]]}' with value {cell}" for coordinate, cell in zip(coordinates, cells)] rows_description = " and ".join(rows_info) base_sentence += f" This includes the following data: {rows_description}." # Generate a fluent response using the T5 model, rephrasing the base sentence input_text = f"Given the question: '{question}', generate a more human-readable response: {base_sentence}" # Tokenize the input and generate a fluent response using T5 inputs = t5_tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True) summary_ids = t5_model.generate(inputs, max_length=150, num_beams=4, early_stopping=True) # Decode the generated text generated_text = t5_tokenizer.decode(summary_ids[0], skip_special_tokens=True) # Display the final generated response st.markdown("

Final Generated Response with LLM:

", unsafe_allow_html=True) st.success(generated_text) except Exception as e: st.warning("Please retype your question and make sure to use the column name and cell value correctly.")