umairahmad89
initial commit
081077e
raw
history blame
5.96 kB
import gradio as gr
import openpyxl
import csv
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import tempfile
import os
# Load the sentence transformer model
model = SentenceTransformer('BAAI/bge-small-en-v1.5')
def filter_excel1(excel_path, min_row, max_row):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel.worksheets[0]
data = [["category", "diagnostic_statement"]]
prev_category = ""
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row):
category = row[1].value
diagnostic_statement = row[5].value
if prev_category == "":
prev_category = category
if not category:
category = prev_category
else:
prev_category = category
data.append([category, diagnostic_statement])
return data
except Exception as e:
raise gr.Error(f"Error processing Excel 1: {str(e)}")
def filter_excel2(excel_path, min_row, max_row, sheetname):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel[sheetname]
data = [["description", "category"]]
for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row):
description = row[0].value
category = row[6].value
# filtering out the categories
if isinstance(category, str) and category!="#N/A":
pass
elif isinstance(category, int):
category="#N/A"
else:
category="#N/A"
if description:
data.append([description, category])
return data
except Exception as e:
raise gr.Error(f"Error processing Excel 2: {str(e)}")
def get_embeddings(texts):
return model.encode(texts)
def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3):
similarities = cosine_similarity([query_embedding], statement_embeddings)[0]
top_indices = np.argsort(similarities)[-n:][::-1]
return [categories[i] for i in top_indices]
def process_data(csv1_data, csv2_data):
try:
diagnostic_statements = [row[1] for row in csv1_data[1:]]
statement_embeddings = get_embeddings(diagnostic_statements)
categories = [row[0] for row in csv1_data[1:]]
processed_descriptions = []
processed_categories = []
for row in csv2_data[1:]:
description = row[0]
if description in processed_descriptions:
row[1] = processed_categories[processed_descriptions.index(description)]
continue
if row[1] != "#N/A":
processed_categories.append(row[1])
processed_descriptions.append(description)
continue
description_embedding = get_embeddings([description])[0]
top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories)
row[1] = ', '.join(top_categories)
processed_descriptions.append(description)
processed_categories.append(', '.join(top_categories))
return csv2_data
except Exception as e:
raise gr.Error(f"Error processing data: {str(e)}")
def update_excel(excel_path, processed_data):
try:
excel = openpyxl.load_workbook(excel_path)
sheet_0 = excel["1Q2024"]
idx = 0
for row in sheet_0.iter_rows(min_row=2):
description = row[0]
category = row[6]
if not description.value:
continue
try:
sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1])
idx += 1
except IndexError:
print(f"Warning: Not enough processed data for row {category.row}")
return excel
except Exception as e:
raise gr.Error(f"Error updating Excel: {str(e)}")
def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname):
try:
gr.Info("Starting processing...")
# Process Excel 1
gr.Info("Processing Excel 1...")
csv1_data = filter_excel1(excel1, min_row1, max_row1)
# Process Excel 2
gr.Info("Processing Excel 2...")
csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname)
# Process data
gr.Info("Running similarity search...")
processed_data = process_data(csv1_data, csv2_data)
# Update Excel 2
gr.Info("Updating Excel file...")
updated_excel = update_excel(excel2, processed_data[1:])
# Save the updated Excel file
gr.Info("Saving updated Excel file...")
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp:
updated_excel.save(tmp.name)
gr.Info("Processing complete!")
return tmp.name
except gr.Error as e:
# Re-raise Gradio errors to display them in the interface
raise e
except Exception as e:
# Catch any other unexpected errors
raise gr.Error(f"An unexpected error occurred: {str(e)}")
# Gradio interface
iface = gr.Interface(
fn=process_files,
inputs=[
gr.File(label="Upload Source Excel (Excel 1)"),
gr.File(label="Upload Excel to be Filled (Excel 2)"),
gr.Number(label="Min Row for Excel 1", value=2),
gr.Number(label="Max Row for Excel 1", value=1000),
gr.Number(label="Min Row for Excel 2", value=2),
gr.Number(label="Max Row for Excel 2", value=3009),
gr.Textbox(label="Sheet Name for Excel 2")
],
outputs=gr.File(label="Download Updated Excel"),
title="Excel Processor",
description="Upload two Excel files, specify row ranges, and download the processed Excel file."
)
iface.launch()