import gradio as gr import openpyxl import csv from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np import tempfile import os import pandas as pd import re # Load the sentence transformer model model = SentenceTransformer('BAAI/bge-small-en-v1.5') def filter_excel1(excel_path, min_row, max_row): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel.worksheets[0] data = [["category", "diagnostic_statement"]] prev_category = "" for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): category = row[1].value diagnostic_statement = row[5].value if prev_category == "": prev_category = category if not category: category = prev_category else: prev_category = category data.append([category, diagnostic_statement]) return data except Exception as e: raise gr.Error(f"Error processing Excel 1: {str(e)}") def filter_excel2(excel_path, min_row, max_row, sheetname): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel[sheetname] data = [["description", "category"]] for row in sheet_0.iter_rows(min_row=min_row, max_row=max_row): description = row[0].value category = row[6].value # filtering out the categories if isinstance(category, str) and category!="#N/A": pass elif isinstance(category, int): category="#N/A" else: category="#N/A" if description: data.append([description, category]) return data except Exception as e: raise gr.Error(f"Error processing Excel 2: {str(e)}") def sheet_lookup(current_sheet_name, excel_file_path): # Read the Excel file xl = pd.ExcelFile(excel_file_path) # Determine the previous quarter sheet name match = re.match(r'(\d)Q(\d{4})', current_sheet_name) if match: quarter, year = map(int, match.groups()) prev_quarter = 4 if quarter == 1 else quarter - 1 prev_year = year - 1 if quarter == 1 else year prev_sheet_name = f"{prev_quarter}Q{prev_year}" else: raise ValueError("Invalid sheet name format") # Read the current sheet current_df = xl.parse(current_sheet_name) # Check if previous sheet exists if prev_sheet_name in xl.sheet_names: # Read the previous quarter sheet prev_df = xl.parse(prev_sheet_name) # Perform the lookup lookup_col = 'Monitoring Tool Instance ID-AU' current_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) prev_df.drop_duplicates(subset=[lookup_col], keep='first', inplace=True) value_col = f"{prev_quarter}q CRI Profile Mapping" result_col = f"{quarter}q CRI Profile Mapping" # Create a dictionary for faster lookup lookup_dict = dict(zip(prev_df[lookup_col], prev_df[value_col])) # Perform the lookup and fill the result column current_df[result_col] = current_df[lookup_col].map(lookup_dict).fillna('#N/A') else: # If previous sheet doesn't exist, fill the result column with '#N/A' result_col = f"{quarter}q CRI Profile Mapping" current_df[result_col] = '#N/A' print(f"Warning: Previous sheet {prev_sheet_name} not found. Filling {result_col} with '#N/A'") # Save the results back to the Excel file with pd.ExcelWriter(excel_file_path, mode='a', if_sheet_exists='replace') as writer: current_df.to_excel(writer, sheet_name=current_sheet_name, index=False) print(f"Processing complete for sheet {current_sheet_name}") def get_embeddings(texts): return model.encode(texts) def get_top_n_categories(query_embedding, statement_embeddings, categories, n=3): similarities = cosine_similarity([query_embedding], statement_embeddings)[0] top_indices = np.argsort(similarities)[-n:][::-1] return [categories[i] for i in top_indices] def process_data(csv1_data, csv2_data): try: diagnostic_statements = [row[1] for row in csv1_data[1:]] statement_embeddings = get_embeddings(diagnostic_statements) categories = [row[0] for row in csv1_data[1:]] processed_descriptions = [] processed_categories = [] for row in csv2_data[1:]: description = row[0] if description in processed_descriptions: row[1] = processed_categories[processed_descriptions.index(description)] continue if row[1] != "#N/A": processed_categories.append(row[1]) processed_descriptions.append(description) continue description_embedding = get_embeddings([description])[0] top_categories = get_top_n_categories(description_embedding, statement_embeddings, categories) row[1] = ', '.join(top_categories) processed_descriptions.append(description) processed_categories.append(', '.join(top_categories)) return csv2_data except Exception as e: raise gr.Error(f"Error processing data: {str(e)}") def update_excel(excel_path, processed_data, sheetname): try: excel = openpyxl.load_workbook(excel_path) sheet_0 = excel[sheetname] idx = 0 for row in sheet_0.iter_rows(min_row=2): description = row[0] category = row[6] if not description.value: continue try: sheet_0.cell(row=category.row, column=category.col_idx, value=processed_data[idx][1]) idx += 1 except IndexError: print(f"Warning: Not enough processed data for row {category.row}") return excel except Exception as e: raise gr.Error(f"Error updating Excel: {str(e)}") def process_files(excel1, excel2, min_row1, max_row1, min_row2, max_row2, sheetname): try: gr.Info("Starting processing...") gr.Info("Doing lookup...") sheet_lookup(sheetname, excel2) # Process Excel 1 gr.Info("Processing Excel 1...") csv1_data = filter_excel1(excel1, min_row1, max_row1) # Process Excel 2 gr.Info("Processing Excel 2...") csv2_data = filter_excel2(excel2, min_row2, max_row2, sheetname) # Process data gr.Info("Running similarity search...") processed_data = process_data(csv1_data, csv2_data) # Update Excel 2 gr.Info("Updating Excel file...") updated_excel = update_excel(excel2, processed_data[1:], sheetname) # Save the updated Excel file gr.Info("Saving updated Excel file...") with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: updated_excel.save(tmp.name) gr.Info("Processing complete!") return tmp.name except gr.Error as e: # Re-raise Gradio errors to display them in the interface raise e except Exception as e: # Catch any other unexpected errors raise gr.Error(f"An unexpected error occurred: {str(e)}") # Gradio interface iface = gr.Interface( fn=process_files, inputs=[ gr.File(label="Upload Source Excel (Excel 1)"), gr.File(label="Upload Excel to be Filled (Excel 2)"), gr.Number(label="Min Row for Excel 1", value=2), gr.Number(label="Max Row for Excel 1", value=1000), gr.Number(label="Min Row for Excel 2", value=2), gr.Number(label="Max Row for Excel 2", value=3009), gr.Textbox(label="Sheet Name for Excel 2") ], outputs=gr.File(label="Download Updated Excel"), title="Excel Processor", description="Upload two Excel files, specify row ranges, and download the processed Excel file." ) iface.launch()