Spaces:
Sleeping
Sleeping
# import requests | |
# import numpy as np | |
# import tensorflow as tf | |
# import tensorflow_hub as hub | |
# import gradio as gr | |
# from PIL import Image | |
# # Load models | |
# #model_initial = keras.models.load_model( | |
# # "models/initial_model.h5", custom_objects={'KerasLayer': hub.KerasLayer} | |
# #) | |
# #model_tumor = keras.models.load_model( | |
# # "models/model_tumor.h5", custom_objects={'KerasLayer': hub.KerasLayer} | |
# #) | |
# #model_stroke = keras.models.load_model( | |
# # "models/model_stroke.h5", custom_objects={'KerasLayer': hub.KerasLayer} | |
# #) | |
# #model_alzheimer = keras.models.load_model( | |
# # "models/model_alzheimer.h5", custom_objects={'KerasLayer': hub.KerasLayer} | |
# # API key and user ID for on-demand | |
# api_key = 'KGSjxB1uptfSk8I8A7ciCuNT9Xa3qWC3' | |
# external_user_id = 'plugin-1717464304' | |
# # Step 1: Create a chat session with the API | |
# def create_chat_session(): | |
# create_session_url = 'https://api.on-demand.io/chat/v1/sessions' | |
# create_session_headers = { | |
# 'apikey': api_key | |
# } | |
# create_session_body = { | |
# "pluginIds": [], | |
# "externalUserId": external_user_id | |
# } | |
# response = requests.post(create_session_url, headers=create_session_headers, json=create_session_body) | |
# response_data = response.json() | |
# session_id = response_data['data']['id'] | |
# return session_id | |
# # Step 2: Submit query to the API | |
# def submit_query(session_id, query): | |
# submit_query_url = f'https://api.on-demand.io/chat/v1/sessions/{session_id}/query' | |
# submit_query_headers = { | |
# 'apikey': api_key | |
# } | |
# submit_query_body = { | |
# "endpointId": "predefined-openai-gpt4o", | |
# "query": query, | |
# "pluginIds": ["plugin-1712327325", "plugin-1713962163"], | |
# "responseMode": "sync" | |
# } | |
# response = requests.post(submit_query_url, headers=submit_query_headers, json=submit_query_body) | |
# return response.json() | |
# # Combined disease model (placeholder) | |
# class CombinedDiseaseModel(tf.keras.Model): | |
# def __init__(self, model_initial, model_alzheimer, model_tumor, model_stroke): | |
# super(CombinedDiseaseModel, self).__init__() | |
# self.model_initial = model_initial | |
# self.model_alzheimer = model_alzheimer | |
# self.model_tumor = model_tumor | |
# self.model_stroke = model_stroke | |
# self.disease_labels = ["Alzheimer's", 'No Disease', 'Stroke', 'Tumor'] | |
# self.sub_models = { | |
# "Alzheimer's": model_alzheimer, | |
# 'Tumor': model_tumor, | |
# 'Stroke': model_stroke | |
# } | |
# def call(self, inputs): | |
# initial_probs = self.model_initial(inputs, training=False) | |
# main_disease_idx = tf.argmax(initial_probs, axis=1) | |
# main_disease = self.disease_labels[main_disease_idx[0].numpy()] | |
# main_disease_prob = initial_probs[0, main_disease_idx[0]].numpy() | |
# if main_disease == 'No Disease': | |
# sub_category = "No Disease" | |
# sub_category_prob = main_disease_prob | |
# else: | |
# sub_model = self.sub_models[main_disease] | |
# sub_category_pred = sub_model(inputs, training=False) | |
# sub_category = tf.argmax(sub_category_pred, axis=1).numpy()[0] | |
# sub_category_prob = sub_category_pred[0, sub_category].numpy() | |
# if main_disease == "Alzheimer's": | |
# sub_category_label = ['Very Mild', 'Mild', 'Moderate'] | |
# elif main_disease == 'Tumor': | |
# sub_category_label = ['Glioma', 'Meningioma', 'Pituitary'] | |
# elif main_disease == 'Stroke': | |
# sub_category_label = ['Ischemic', 'Hemorrhagic'] | |
# sub_category = sub_category_label[sub_category] | |
# return f"The MRI image shows {main_disease} with a probability of {main_disease_prob*100:.2f}%.\n" \ | |
# f"The subcategory of {main_disease} is {sub_category} with a probability of {sub_category_prob*100:.2f}%." | |
# # Placeholder function to process images | |
# def process_image(image): | |
# image = image.resize((256, 256)) | |
# image.convert("RGB") | |
# image_array = np.array(image) / 255.0 | |
# image_array = np.expand_dims(image_array, axis=0) | |
# # Prediction logic here | |
# # predictions = cnn_model(image_array) | |
# return "Mock prediction: Disease identified with a probability of 85%." | |
# # Function to handle patient info, query, and image processing | |
# def gradio_interface(patient_info, query_type, image): | |
# if image is not None: | |
# image_response = process_image(image) | |
# # Call LLM with patient info and query | |
# session_id = create_chat_session() | |
# query = f"Patient Info: {patient_info}\nQuery Type: {query_type}" | |
# llm_response = submit_query(session_id, query) | |
# # Debug: Print the full response to inspect it | |
# print("LLM Response:", llm_response) # This will print the full response for inspection | |
# # Safely handle 'message' if it exists | |
# message = llm_response.get('data', {}).get('message', 'No message returned from LLM') | |
# # Check if message is empty and print the complete response if necessary | |
# if message == 'No message returned from LLM': | |
# print("Full LLM Response Data:", llm_response) # Inspect the full LLM response for any helpful info | |
# response = f"Patient Info: {patient_info}\nQuery Type: {query_type}\n\n{image_response}\n\nLLM Response:\n{message}" | |
# return response | |
# else: | |
# return "Please upload an image." | |
# # Gradio interface | |
# iface = gr.Interface( | |
# fn=gradio_interface, | |
# inputs=[ | |
# gr.Textbox( | |
# label="Patient Information", | |
# placeholder="Enter patient details here...", | |
# lines=5, | |
# max_lines=10 | |
# ), | |
# gr.Textbox( | |
# label="Query Type", | |
# placeholder="Describe the type of diagnosis or information needed..." | |
# ), | |
# gr.Image( | |
# type="pil", | |
# label="Upload an MRI Image", | |
# ) | |
# ], | |
# outputs=gr.Textbox(label="Response", placeholder="The response will appear here..."), | |
# title="Medical Diagnosis with MRI and LLM", | |
# description="Upload MRI images and provide patient information for a combined CNN model and LLM analysis." | |
# ) | |
# iface.launch() | |
import requests | |
import gradio as gr | |
import logging | |
import json | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# API key and user ID for on-demand | |
api_key = 'KGSjxB1uptfSk8I8A7ciCuNT9Xa3qWC3' | |
external_user_id = 'plugin-1717464304' | |
def create_chat_session(): | |
try: | |
create_session_url = 'https://api.on-demand.io/chat/v1/sessions' | |
create_session_headers = { | |
'apikey': api_key, | |
'Content-Type': 'application/json' | |
} | |
create_session_body = { | |
"pluginIds": [], | |
"externalUserId": external_user_id | |
} | |
response = requests.post(create_session_url, headers=create_session_headers, json=create_session_body) | |
response.raise_for_status() | |
return response.json()['data']['id'] | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error creating chat session: {str(e)}") | |
raise | |
def submit_query(session_id, query): | |
try: | |
submit_query_url = f'https://api.on-demand.io/chat/v1/sessions/{session_id}/query' | |
submit_query_headers = { | |
'apikey': api_key, | |
'Content-Type': 'application/json' | |
} | |
structured_query = f""" | |
Based on the following patient information, provide a detailed medical analysis in JSON format: | |
{query} | |
Return only valid JSON with these fields: | |
- diagnosis_details | |
- probable_diagnoses (array) | |
- treatment_plans (array) | |
- lifestyle_modifications (array) | |
- medications (array of objects with name and dosage) | |
- additional_tests (array) | |
- precautions (array) | |
- follow_up (string) | |
""" | |
submit_query_body = { | |
"endpointId": "predefined-openai-gpt4o", | |
"query": structured_query, | |
"pluginIds": ["plugin-1712327325", "plugin-1713962163"], | |
"responseMode": "sync" | |
} | |
response = requests.post(submit_query_url, headers=submit_query_headers, json=submit_query_body) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error submitting query: {str(e)}") | |
raise | |
def extract_json_from_answer(answer): | |
"""Extract and clean JSON from the LLM response""" | |
try: | |
# First try to parse the answer directly | |
return json.loads(answer) | |
except json.JSONDecodeError: | |
try: | |
# If that fails, try to find JSON content and parse it | |
start_idx = answer.find('{') | |
end_idx = answer.rfind('}') + 1 | |
if start_idx != -1 and end_idx != 0: | |
json_str = answer[start_idx:end_idx] | |
return json.loads(json_str) | |
except (json.JSONDecodeError, ValueError): | |
logger.error("Failed to parse JSON from response") | |
raise | |
def gradio_interface(patient_info): | |
try: | |
session_id = create_chat_session() | |
llm_response = submit_query(session_id, patient_info) | |
if not llm_response or 'data' not in llm_response or 'answer' not in llm_response['data']: | |
raise ValueError("Invalid response structure") | |
# Extract and clean JSON from the response | |
json_data = extract_json_from_answer(llm_response['data']['answer']) | |
# Return clean JSON string without extra formatting | |
return json.dumps(json_data) | |
except Exception as e: | |
logger.error(f"Error in gradio_interface: {str(e)}") | |
return json.dumps({"error": str(e)}) | |
# Gradio interface | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.Textbox( | |
label="Patient Information", | |
placeholder="Enter patient details including: symptoms, medical history, current medications, age, gender, and any relevant test results...", | |
lines=5, | |
max_lines=10 | |
) | |
], | |
outputs=gr.Textbox( | |
label="Medical Analysis", | |
placeholder="JSON analysis will appear here...", | |
lines=15 | |
), | |
title="Medical Diagnosis Assistant", | |
description="Enter detailed patient information to receive a structured medical analysis in JSON format." | |
) | |
if __name__ == "__main__": | |
iface.launch() |