import gradio as gr import openpyxl import csv from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np import tempfile import os # Load the sentence transformer model model = SentenceTransformer('BAAI/bge-small-en-v1.5') def filter_excel1(excel_path, min_row, max_row): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel.worksheets[0] data = [["category", "diagnostic_statement"]] prev_category = "" for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): category = row[1].value diagnostic_statement = row[5].value if prev_category == "": prev_category = category if not category: category = prev_category else: prev_category = category data.append([category, diagnostic_statement]) return data except Exception as e: raise gr.Error(f"Error processing Excel 1: {str(e)}") def filter_excel2(excel_path, min_row, max_row, sheetname): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel[sheetname] data = [["description", "category"]] for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): description = row[0].value category = row[6].value # filtering out the categories if isinstance(category, str) and category!="#N/A": pass elif isinstance(category, int): category="#N/A" else: category="#N/A" if description: data.append([description, category]) return data except Exception as e: raise gr.Error(f"Error processing Excel 2: {str(e)}") def get_embeddings(texts): return model.encode(texts) def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3): similarities = cosine_similarity([query_embedding], statement_embeddings)[0] top_indices = np.argsort(similarities)[-n:][::-1] return [categories[i] for i in top_indices] def process_data(csv1_data, csv2_data): try: diagnostic_statements = [row[1] for row in csv1_data[1:]] statement_embeddings = get_embeddings(diagnostic_statements) categories = [row[0] for row in csv1_data[1:]] processed_descriptions = [] processed_categories = [] for row in csv2_data[1:]: description = row[0] if description in processed_descriptions: row[1] = processed_categories[processed_descriptions.index(description)] continue if row[1] != "#N/A": processed_categories.append(row[1]) processed_descriptions.append(description) continue description_embedding = get_embeddings([description])[0] top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories) row[1] = ', '.join(top_categories) processed_descriptions.append(description) processed_categories.append(', '.join(top_categories)) return csv2_data except Exception as e: raise gr.Error(f"Error processing data: {str(e)}") def update_excel(excel_path, processed_data): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel["1Q2024"] idx = 0 for row in sheet_0.iter_rows(min_row=2): description = row[0] category = row[6] if not description.value: continue try: sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1]) idx += 1 except IndexError: print(f"Warning: Not enough processed data for row {category.row}") return excel except Exception as e: raise gr.Error(f"Error updating Excel: {str(e)}") def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname): try: gr.Info("Starting processing...") # Process Excel 1 gr.Info("Processing Excel 1...") csv1_data = filter_excel1(excel1, min_row1, max_row1) # Process Excel 2 gr.Info("Processing Excel 2...") csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname) # Process data gr.Info("Running similarity search...") processed_data = process_data(csv1_data, csv2_data) # Update Excel 2 gr.Info("Updating Excel file...") updated_excel = update_excel(excel2, processed_data[1:]) # Save the updated Excel file gr.Info("Saving updated Excel file...") with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: updated_excel.save(tmp.name) gr.Info("Processing complete!") return tmp.name except gr.Error as e: # Re-raise Gradio errors to display them in the interface raise e except Exception as e: # Catch any other unexpected errors raise gr.Error(f"An unexpected error occurred: {str(e)}") # Gradio interface iface = gr.Interface( fn=process_files, inputs=[ gr.File(label="Upload Source Excel (Excel 1)"), gr.File(label="Upload Excel to be Filled (Excel 2)"), gr.Number(label="Min Row for Excel 1", value=2), gr.Number(label="Max Row for Excel 1", value=1000), gr.Number(label="Min Row for Excel 2", value=2), gr.Number(label="Max Row for Excel 2", value=3009), gr.Textbox(label="Sheet Name for Excel 2") ], outputs=gr.File(label="Download Updated Excel"), title="Excel Processor", description="Upload two Excel files, specify row ranges, and download the processed Excel file." ) iface.launch()