# import gradio as gr # import numpy as np # import cv2 as cv # import requests # import time # import os # host = os.environ.get("host") # code = os.environ.get("code") # model_llm = os.environ.get("model") # content = os.environ.get("content") # state = os.environ.get("state") # system = os.environ.get("system") # auth = os.environ.get("auth") # data = None # model = None # image = None # prediction = None # labels = None # print('START') # np.set_printoptions(suppress=True) # data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) # with open("labels.txt", "r") as file: # labels = file.read().splitlines() # messages = [ # {"role": "system", "content": system} # ] # def classify(UserInput, Image, Textbox2, Textbox3): # if Textbox3 == code: # print("Image: ", Image) # if Image is not None: # output = [] # image_data = np.array(Image) # image_data = cv.resize(image_data, (224, 224)) # image_array = np.asarray(image_data) # normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 # data[0] = normalized_image_array # import tensorflow as tf # model = tf.keras.models.load_model('keras_model.h5') # prediction = model.predict(data) # max_label_index = None # max_prediction_value = -1 # print('Prediction') # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # messages.append({"role": "user", "content": UserInput}) # for i, label in enumerate(labels): # prediction_value = float(prediction[0][i]) # rounded_value = round(prediction_value, 2) # print(f'{label}: {rounded_value}') # if prediction_value > max_prediction_value: # max_label_index = i # max_prediction_value = prediction_value # if max_label_index is not None: # max_label = labels[max_label_index].split(' ', 1)[1] # max_rounded_prediction = round(max_prediction_value, 2) # print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') # time.sleep(1) # if max_rounded_prediction > 0.5: # print("\nWays to dispose of this waste: " + max_label) # messages.append({"role": "user", "content": content + " " + max_label}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) # elif max_rounded_prediction < 0.5: # output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one."}) # return output # else: # output = [] # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # for i in Textbox2_edited: # messages.append( # {"role": "user", "content": i} # ) # print("messages after appending:", messages) # time.sleep(1) # messages.append({"role": "user", "content": UserInput}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Chat", "content": reply}) # return output # else: # return "Unauthorized" # user_inputs = [ # gr.Textbox(label="User Input", type="text"), # gr.Image(), # gr.Textbox(label="Textbox2", type="text"), # gr.Textbox(label="Textbox3", type="password") # ] # iface = gr.Interface( # fn=classify, # inputs=user_inputs, # outputs=gr.outputs.JSON(), # title="Classifier", # ) # iface.launch() # import gradio as gr # import numpy as np # import cv2 as cv # import requests # import time # import os # host = os.environ.get("host") # code = os.environ.get("code") # model_llm = os.environ.get("model") # content = os.environ.get("content") # state = os.environ.get("state") # system = os.environ.get("system") # auth = os.environ.get("auth") # auth2 = os.environ.get("auth2") # data = None # model = None # image = None # prediction = None # labels = None # print('START') # np.set_printoptions(suppress=True) # data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) # with open("labels.txt", "r") as file: # labels = file.read().splitlines() # messages = [ # {"role": "system", "content": system} # ] # def classify(platform,UserInput, Image, Textbox2, Textbox3): # if Textbox3 == code: # if Image is not None: # output = [] # headers = { # "Authorization": f"Bearer {auth2}" # } # if platform == "wh": # get_image = requests.get(Image, headers=headers) # print(get_image.content) # elif platform == "web": # print("WEB") # else: # pass # image_data = np.array(get_image) # image_data = cv.resize(image_data, (224, 224)) # image_array = np.asarray(image_data) # normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 # data[0] = normalized_image_array # import tensorflow as tf # model = tf.keras.models.load_model('keras_model.h5') # prediction = model.predict(data) # max_label_index = None # max_prediction_value = -1 # print('Prediction') # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # messages.append({"role": "user", "content": UserInput}) # for i, label in enumerate(labels): # prediction_value = float(prediction[0][i]) # rounded_value = round(prediction_value, 2) # print(f'{label}: {rounded_value}') # if prediction_value > max_prediction_value: # max_label_index = i # max_prediction_value = prediction_value # if max_label_index is not None: # max_label = labels[max_label_index].split(' ', 1)[1] # max_rounded_prediction = round(max_prediction_value, 2) # print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') # time.sleep(1) # if max_rounded_prediction > 0.5: # print("\nWays to dispose of this waste: " + max_label) # messages.append({"role": "user", "content": content + " " + max_label}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) # elif max_rounded_prediction < 0.5: # output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one."}) # return output # else: # output = [] # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # for i in Textbox2_edited: # messages.append( # {"role": "user", "content": i} # ) # print("messages after appending:", messages) # time.sleep(1) # messages.append({"role": "user", "content": UserInput}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Chat", "content": reply}) # return output # else: # return "Unauthorized" # user_inputs = [ # gr.Textbox(label="Platform", type="text"), # gr.Textbox(label="User Input", type="text"), # gr.Textbox(label="Image", type="text"), # gr.Textbox(label="Textbox2", type="text"), # gr.Textbox(label="Textbox3", type="password") # ] # iface = gr.Interface( # fn=classify, # inputs=user_inputs, # outputs=gr.outputs.JSON(), # title="Classifier", # ) # iface.launch() ############################### MOST WORKING # import gradio as gr # import numpy as np # import cv2 as cv # import requests # import io # from PIL import Image # import os # import tensorflow as tf # import random # host = os.environ.get("host") # code = os.environ.get("code") # model_llm = os.environ.get("model") # content = os.environ.get("content") # state = os.environ.get("state") # system = os.environ.get("system") # auth = os.environ.get("auth") # auth2 = os.environ.get("auth2") # data = None # np.set_printoptions(suppress=True) # model = tf.keras.models.load_model('keras_model.h5') # data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) # with open("labels.txt", "r") as file: # labels = file.read().splitlines() # messages = [ # {"role": "system", "content": system} # ] # def classify(platform, UserInput, Images, Textbox2, Textbox3): # if Textbox3 == code: # imageData = None # if Images != "None": # output = [] # headers = { # "Authorization": f"Bearer {auth2}" # } # if platform == "wh": # get_image = requests.get(Images, headers=headers) # if get_image.status_code == 200: # image_data = get_image.content # elif platform == "web": # print("WEB") # else: # pass # image = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR) # image = cv.resize(image, (224, 224)) # image_array = np.asarray(image) # normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 # data[0] = normalized_image_array # prediction = model.predict(data) # max_label_index = None # max_prediction_value = -1 # print('Prediction') # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # print(UserInput) # print("appending") # messages.append({"role": "user", "content": UserInput}) # for i, label in enumerate(labels): # prediction_value = float(prediction[0][i]) # rounded_value = round(prediction_value, 2) # print(f'{label}: {rounded_value}') # if prediction_value > max_prediction_value: # max_label_index = i # max_prediction_value = prediction_value # if max_label_index is not None: # max_label = labels[max_label_index].split(' ', 1)[1] # max_rounded_prediction = round(max_prediction_value, 2) # print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') # if max_rounded_prediction > 0.5: # print("\nWays to dispose of this waste: " + max_label) # messages.append({"role": "user", "content": content + " " + max_label}) # # messages.append({"role": "user", "content": max_label}) # print("IMAGE messages after appending:", messages) # header = { # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # try: # response = requests.post(host, headers=header, json={ # "messages": messages, # "model": model_llm # }).json() # print("RESPONSE TRY",response) # reply = response["choices"][0]["message"]["content"] # # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) # except: # print("DOESN'T WORK") # elif max_rounded_prediction < 0.5: # output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one"}) # return output # elif Images == "None": # output = [] # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # for i in Textbox2_edited: # messages.append({"role": "user", "content": i}) # print("messages after appending:", messages) # messages.append({"role": "user", "content": UserInput}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Chat", "content": reply}) # return output # else: # return "Unauthorized" # user_inputs = [ # gr.Textbox(label="Platform", type="text"), # gr.Textbox(label="User Input", type="text"), # gr.Textbox(label="Image", type="text"), # gr.Textbox(label="Textbox2", type="text"), # gr.Textbox(label="Textbox3", type="password") # ] # iface = gr.Interface( # fn=classify, # inputs=user_inputs, # outputs=gr.outputs.JSON(), # title="Classifier", # ) # iface.launch() ############## WORKING AS OF THIS MONTH ############## # import gradio as gr # import numpy as np # import cv2 as cv # import requests # import io # import time # from PIL import Image # import os # import tensorflow as tf # import random # import openai # host = os.environ.get("host") # code = os.environ.get("code") # model_llm = os.environ.get("model") # content = os.environ.get("content") # state = os.environ.get("state") # system = os.environ.get("system") # auth = os.environ.get("auth") # auth2 = os.environ.get("auth2") # openai.api_key = os.environ.get("auth") # openai.api_base = os.environ.get("host") # data = None # np.set_printoptions(suppress=True) # model = tf.keras.models.load_model('keras_model.h5') # data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) # with open("labels.txt", "r") as file: # labels = file.read().splitlines() # messages = [ # {"role": "system", "content": system} # ] # def classify(platform, UserInput, Images, Textbox2, Textbox3): # if UserInput.lower() == "clear history": # messages.clear() # messages.append( # {"role": "system", "content": system} # ) # if Textbox3 == code: # imageData = None # if Images != "None": # output = [] # headers = { # "Authorization": f"Bearer {auth2}" # } # if platform == "wh": # get_image = requests.get(Images, headers=headers) # if get_image.status_code == 200: # image_data = get_image.content # elif platform == "web": # # print("WEB") # url = requests.get(Images) # image_data = url.content # else: # pass # image = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR) # image = cv.resize(image, (224, 224)) # image_array = np.asarray(image) # normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 # data[0] = normalized_image_array # prediction = model.predict(data) # max_label_index = None # max_prediction_value = -1 # print('Prediction') # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # print(UserInput) # print("appending") # # messages.append({"role": "user", "content": UserInput}) # # Pop earlier messages if there are more than 10 # # if UserInput.lower() == "clear history": # # while len(messages) > 10: # # messages.pop(0) # for i, label in enumerate(labels): # prediction_value = float(prediction[0][i]) # rounded_value = round(prediction_value, 2) # print(f'{label}: {rounded_value}') # if prediction_value > max_prediction_value: # max_label_index = i # max_prediction_value = prediction_value # if max_label_index is not None: # max_label = labels[max_label_index].split(' ', 1)[1] # max_rounded_prediction = round(max_prediction_value, 2) # print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') # if max_rounded_prediction > 0.5: # print("\nWays to dispose of this waste: " + max_label) # messages.append({"role": "user", "content": content + " " + max_label}) # print("IMAGE messages after appending:", messages) # print("Message list of image:", messages) # header = { # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # try: # # response = requests.post(host, headers=header, json={ # # "messages": messages, # # "model": model_llm # # }).json() # completion = openai.ChatCompletion.create( # model="gpt-3.5-turbo", # messages=messages # ) # # reply = response["choices"][0]["message"]["content"] # reply = completion.choices[0].message['content'] # # # reply = response["choices"][0]["message"]["content"] # # reply = response.choices[0].message['content'] # print("RESPONSE TRY", completion) # output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) # except: # print("DOESN'T WORK") # elif max_rounded_prediction < 0.5: # output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one"}) # return output # elif Images == "None": # output = [] # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # for i in Textbox2_edited: # messages.append({"role": "user", "content": i}) # print("messages after appending:", messages) # messages.append({"role": "user", "content": UserInput}) # # Pop earlier messages if there are more than 10 # # if UserInput.lower() == "clear history": # # while len(messages) > 10: # # messages.pop(0) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # try: # # response = requests.post(host, headers=headers, json={ # # "messages": messages, # # "model": model_llm # # }).json() # completion = openai.ChatCompletion.create( # model="gpt-3.5-turbo", # messages=messages # ) # # reply = response["choices"][0]["message"]["content"] # reply = completion.choices[0].message['content'] # print("RESPONSE TRY (NO IMAGE)", completion, reply) # except: # reply = "Maximum messages: 15. Please clear your history and Try Again! (No Image)" # output.append({"Mode": "Chat", "content": reply}) # return output # else: # return "Unauthorized" # user_inputs = [ # gr.Textbox(label="Platform", type="text"), # gr.Textbox(label="User Input", type="text"), # gr.Textbox(label="Image", type="text"), # gr.Textbox(label="Textbox2", type="text"), # gr.Textbox(label="Textbox3", type="password") # ] # iface = gr.Interface( # fn=classify, # inputs=user_inputs, # outputs=gr.outputs.JSON(), # title="Classifier", # ) # iface.launch() ############## NEW VERSION ############## import gradio as gr import numpy as np import cv2 as cv import requests import io import time from PIL import Image import base64 import os import tensorflow as tf import random import openai import json host = os.environ.get("host") host2 = os.environ.get("host2") code = os.environ.get("code") model_llm = os.environ.get("model") content = os.environ.get("content") state = os.environ.get("state") os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' system = os.environ.get("system") auth = os.environ.get("auth") g_auth = os.environ.get("g_auth") auth2 = os.environ.get("auth2") auth4 = os.environ.get("auth4") openai.api_key = auth openai.api_base = host vis_url = os.environ.get("vis_url") vis_auth = os.environ.get("vis_auth") endpoint = os.environ.get("endpoint") data = None np.set_printoptions(suppress=True) model = tf.keras.models.load_model('keras_model.h5') with open("labels.txt", "r") as file: labels = file.read().splitlines() messages = [ {"role": "system", "content": system} ] messages.clear() def classify(platform, UserInput, Images, Textbox2, Textbox3): print("MODELLL:", model_llm) print(auth) messages.append( {"role": "system", "content": system} ) print(UserInput) if UserInput == "clear history": messages.clear() messages.append( {"role": "system", "content": system} ) if Textbox3 == code: imageData = None global image_url if Images != "None": output = [] headers = { "Authorization": f"Bearer {auth2}" } if platform == "wh": get_image = requests.get(Images, headers=headers) if get_image.status_code == 200: image_data = get_image.content folder_name = "upload" file_name = "saved_image.jpg" os.makedirs(folder_name, exist_ok=True) file_path = os.path.join(folder_name, file_name) with open(file_path, 'wb') as file: file.write(image_data) print(f"Image saved successfully to {file_path}") elif platform == "web": # print("WEB") url = requests.get(Images) image_data = url.content else: pass def encode_image(image: Image.Image, format: str = "PNG") -> str: with io.BytesIO() as buffer: image.save(buffer, format=format) encoded_image = buffer.getvalue().decode("latin-1") return encoded_image def url_to_base64(image_url): try: response = requests.get(image_url) response.raise_for_status() base64_data = base64.b64encode(response.content).decode('utf-8') return base64_data except Exception as e: print(f"Error: {e}") return None def vision(caption): print("VISSS") # with open("image.png", "wb") as file1_write: # file1_write.write(image_data) # Example usage # image_url = 'https://example.com/path/to/image.jpg' # base64_data = url_to_base64(image_url) # image = Image.open(io.BytesIO(file_content)) # base64_image_str = encode_image(image) if image_data: try: # Open the image directly from the image data # image = Image.open(io.BytesIO(image_data)) # base64_image_str = encode_image(image) # payload = { # "content": [ # { # "prompt": "What's this image about? or What does this image contains?", # "image": base64_image_str, # } # ], # "token": vis_auth, # } # url = vis_url # headers = {"Content-Type": "application/json"} # response = requests.post(url, headers=headers, data=json.dumps(payload)) # results = response.json() # results = results["result"] # answer_index = results.find("Answer:") # if answer_index != -1: # try: # result_text = results[answer_index + len("Answer:"):].strip() # print(result_text) # return result_text # except: # pass # else: # return "Answer: not found in the string." payload = json.dumps( { "messages": [{ "role": "user", "content": [ { "type": "text", "text": caption }, { "type": "image_url", "image_url": image_url } ] }], "model": "gemini-pro-vision" } ) headers = { 'Authorization': f'Bearer {auth2}', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } print("IMAGGGGEEE;",Images) # response = requests.request("POST", "https://api.openai.com/v1/chat/completions", headers=headers, data=payload) # print("GGGGG:",response.content) response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages ) # response = response.json() # res = response["choices"][0]["message"]["content"] return response.content except: return "ERRRRRRR" else: print("Error: Image data is not available.") return None if UserInput is not None: caption = UserInput.lower() return vision(caption) else: caption = None image = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR) image = cv.resize(image, (224, 224)) image_array = np.asarray(image) normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) data[0] = normalized_image_array prediction = model.predict(data) max_label_index = None max_prediction_value = -1 print('Prediction') Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") Textbox2 = Textbox2.split(",") Textbox2_edited = [x.strip() for x in Textbox2] Textbox2_edited = list(Textbox2_edited) Textbox2_edited.append(UserInput) print(UserInput) print("appending") # messages.append({"role": "user", "content": UserInput}) # Pop earlier messages if there are more than 10 # if UserInput.lower() == "clear history": # while len(messages) > 10: # messages.pop(0) for i, label in enumerate(labels): prediction_value = float(prediction[0][i]) rounded_value = round(prediction_value, 2) print(f'{label}: {rounded_value}') if prediction_value > max_prediction_value: max_label_index = i max_prediction_value = prediction_value if max_label_index is not None: max_label = labels[max_label_index].split(' ', 1)[1] max_rounded_prediction = round(max_prediction_value, 2) print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') if max_rounded_prediction > 0.5: print("\nWays to dispose of this waste: " + max_label) if caption == None: messages.append({"role": "user", "content": content + " " + max_label}) else: messages.append({"role": "user", "content": caption}) print("IMAGE messages after appending:", messages) print("Message list of image:", messages) header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" } try: # response = requests.post(host, headers=header, json={ # "messages": messages, # "model": model_llm # }).json() completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages ) # reply = response["choices"][0]["message"]["content"] reply = completion.choices[0].message['content'] # # reply = response["choices"][0]["message"]["content"] # reply = response.choices[0].message['content'] print("RESPONSE TRY", completion) if caption == None: output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) else: output.append({"Mode": "Image with Caption", "content": reply}) except: print("DOESN'T WORK") elif max_rounded_prediction < 0.5: if caption == None: output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one"}) else: pass return output elif Images == "None": output = [] Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") Textbox2 = Textbox2.split(",") Textbox2_edited = [x.strip() for x in Textbox2] Textbox2_edited = list(Textbox2_edited) Textbox2_edited.append(UserInput) for i in Textbox2_edited: messages.append({"role": "user", "content": i}) print("messages after appending:", messages) messages.append({"role": "user", "content": UserInput}) # Pop earlier messages if there are more than 10 # if UserInput.lower() == "clear history": # while len(messages) > 10: # messages.pop(0) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {auth}" } try: # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # def get_current_weather(location, unit): # params = { # 'appid': '334f89b7998e8df818503b0f33085621', # 'q': location, # 'units': unit # } # response = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params) # if response.status_code == 200: # return response.json() # else: # return None # @openai_func # def testing_this(name: str, number: str): # """ # This function ask user for their name and a random integer and returns it. # @param name: The name of the user # @param number: Random number entered by the user # """ # if name is None: # return "Your name must be provided, inorder to run the function call" # elif number is None: # return "A random number must be provided, inorder to run the function call" # elif name is None and number is None : # return "You must provide your name and a random number to continue." # else: # return "Function call successfull" def book_enquiry(full_name: str, contact_number: str, street_address: str, landmark: str, zip_code: str, emirate: str, reason: str): """ This function asks the user for their details and books an enquiry for waste disposal. @param full_name: The full name of the user @param contact_number: The contact number of the user @param street_address: The street address of the user @param landmark: The landmark near the user's location @param zip_code: The zip code of the user's location @param emirate: The emirate of the user's location @param reason: The reason for waste disposal enquiry """ if not all([full_name, contact_number, street_address, landmark, zip_code, emirate, reason]): return "You must provide your details to book an enquiry." else: enquiry = requests.post(endpoint, json={ "mode": "WhatsApp", "full_name": full_name, "contact_number": contact_number, "street_address": street_address, "landmark": landmark, "zip_code": zip_code, "emirate": emirate, "reason_for_visit": reason, }) if enquiry.status_code == 200: print(f"Booking enquiry for {full_name} with contact number {contact_number}.") return f"Booking enquiry for {full_name} with contact number {contact_number} successful!. We'll get back to you shortly!" else: print(f"Booking enquiry for {full_name} with contact number {contact_number}.") return f"Booking enquiry for {full_name} with contact number {contact_number} declined!. Please try again or Contact our support team" def clear_chat_history(command: str): """ This function allow the user to clear the chat history. @param command: This should be always clear chat history or something like that """ messages.clear() messages.append( {"role": "system", "content": system} ) return "Chat History cleared successfuly!" def get_total_points(name: str, email: str): """ This function allow the user to view their total points they got by Waste Diposal @param name: This should the registered name of the user @param email: This should be the registered email of the user """ points = requests.post("https://sustainai.tech/user/points", json={ "name": name, "email": email }) print("NAME:", name, "|", "EMAIL:", email) print("RESPONSE FROM POINTS:", points.content) if points.status_code == 200: point_data = points.json() try: userPoints = point_data["points"] print("POINTSSSSS:", points) return f"User Found. Successful. Your Total points is: {userPoints} (if the point is 0 then say 'You got this or smth like that....Dipose more waste to earn more points.')'" except: return "Error Occured while retrieving user points. Please try again. " else: return "The request has been declined. Please ensure that the provided details match the records registered in our system during the registration process" # def get_current_weather(location: str, unit: str): # """ # This function get's the current weather in a given location. # @param location: The name of the user # @param unit: Random number entered by the user # """ # if location is None: # return "Your name must be provided, inorder to run the function call" # else: # params = { # 'appid': '334f89b7998e8df818503b0f33085621', # 'q': location, # 'units': unit # } # response = requests.get('https://api.openweathermap.org/data/2.5/weather', params=params) # print(response.status_code) # if response.status_code == 200: # temperature = weather_data.get('main', {}).get('temp', 'TEMP_FROM_THE_JSON') # forecast = [item.get('description', '') for item in weather_data.get('weather', [])] # new_json = { # "location": location, # "temperature": temperature, # "unit": unit, # "forecast": forecast # } # return new_json # else: # return None # functions = [ # { # "name": "get_current_weather", # "description": "Get the current weather in a given location", # "parameters": { # "type": "object", # "properties": { # "location": { # "type": "string", # "description": "The city and state, e.g. San Francisco, CA", # }, # "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, # }, # "required": ["location"], # }, # } # ] functions = [ { "name": "book_enquiry", "description": "Book an equiry with the user provided details to the support team", "parameters": { "type": "object", "properties": { "full_name": { "type": "string", "description": "The user's full_name provided by him/her, e.g. Alex Simon, Sarah Mary", }, "contact_number": { "type": "string", "description": "The user's contact number provided by him/her, e.g. 1234567890", }, "street_address": { "type": "string", "description": "The user's street address provided by him/her, e.g. Omando Street 123", }, "landmark": { "type": "string", "description": "The user's landmark provided by him/her near his location, e.g. Near 123 Hotel", }, "zip_code": { "type": "string", "description": "The user's zipcode provided by him/her based on the location, e.g. 367774, 0000", }, "emirate": { "type": "string", "description": "The user's emirate provided by him/her, e.g. Sharjah, Abudhabi", }, "reason": { "type": "string", "description": "The user's reason provided by him/her for the booking enquiry", }, }, "required": ["full_name", "contact_number", "street_address", "landmark", "zip_code", "emirate", "reason"], }, }, { "name": "clear_chat_history", "description": "This function allow the user to clear the chat history", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "This should be always clear chat history or something like that", }, }, "required": ["command"], }, }, { "name": "get_total_points", "description": "This function allow the user to view their total points they got by Waste Diposal", "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "This should the registered name of the user", }, "email": { "type": "string", "description": "This should be the registered email of the user", }, }, "required": ["name", "email"], }, }, ] # functions = [ # { # "name": "testing_this", # "description": "This function ask user for their name and a random integer and returns it", # "parameters": { # "type": "object", # "properties": { # "name": { # "type": "string", # "description": "Name of the user", # }, # "number": {"type": "string", "description": "A random number"}, # }, # "required": ["name", "number"], # }, # } # ] completion = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, functions = functions, function_call = "auto", ) comp = completion.choices[0].message print("\nCOMPPP MESSAGE:", comp) # if comp.get("function_call"): # function_name = comp["function_call"]["name"] # # Convert JSON string to Python dictionary # data = json.loads(comp) # # Extract values # function_call = data["function_call"] # arguments_str = function_call["arguments"] # arguments_dict = json.loads(arguments_str) # function_response = testing_this( # name=arguments_dict["name"], # number=arguments_dict["number"] # ) # print("FUNCTION_RESPONSE:", function_response) # print("ARGUMENTS VALUES:", arguments_dict["number"], arguments_dict["number"]) # messages.append(comp) # messages.append({ # "role": "function", # "name": function_name, # "content": function_response # }) # second_response = openai.ChatCompletion.create( # model="gpt-3.5-turbo", # messages=messages # ) # print("YES_FUNCTION_CALL RESPONSE TRY (NO IMAGE)") # return second_response # else: # reply = comp['content'] # print("RESPONSE TRY (NO IMAGE, FUNCTION_CALL)") if comp.get("function_call"): function_name = comp["function_call"]["name"] arguments_str = comp["function_call"]["arguments"] arguments_dict = json.loads(arguments_str) if function_name == "clear_chat_history": function_response = clear_chat_history( command=arguments_dict["command"] ) reply = [{"Mode": "Chat", "content": function_response}] return reply elif function_name == "get_total_points": function_response = get_total_points( name=arguments_dict["name"], email=arguments_dict["email"], ) elif function_name == "book_enquiry": function_response = book_enquiry( full_name=arguments_dict["full_name"], contact_number=arguments_dict["contact_number"], street_address=arguments_dict["street_address"], landmark=arguments_dict["landmark"], zip_code=arguments_dict["zip_code"], emirate=arguments_dict["emirate"], reason=arguments_dict["reason"] ) print("FUNCTION_RESPONSE:", function_response) # print("ARGUMENTS VALUES:", arguments_dict["location"], arguments_dict["unit"]) if function_name == "clear_chat_history": print(f"ARGUMENTS VALUES: Command: {arguments_dict['command']}") elif function_name == "show_my_points": print(f"ARGUMENTS VALUES: Name: {arguments_dict['name']}, Email: {arguments_dict['email']}") elif function_name == "book_enquiry": print(f"ARGUMENTS VALUES: Full_name: {arguments_dict['full_name']}, Contact Number: {arguments_dict['contact_number']}, Street Address: {arguments_dict['street_address']}, Landmark: {arguments_dict['landmark']}, Zip Code: {arguments_dict['zip_code']}, Emirate: {arguments_dict['emirate']}, Reason: {arguments_dict['reason']}") # messages.append(comp) # messages.append({ # "role": "function", # "name": function_name, # "content": function_response # }) second_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role":"user","content": UserInput}, comp, { "role": "function", "name": function_name, "content": function_response } ] ) print("YES_FUNCTION_CALL RESPONSE TRY (NO IMAGE)") if second_response is not None: reply = second_response json_res = json.dumps(reply, indent=2) data = json.loads(json_res) reply = data["choices"][0]["message"]["content"] print("REPLYYYYYYY OPENAI: ", reply) else: return "Error" else: reply = comp['content'] print("RESPONSE TRY (NO IMAGE, FUNCTION_CALL)") # reply = comp['content'] # print("RESPONSE TRY (NO IMAGE)", completion, reply) except Exception as e: print("Error",e) reply = "Our system is currently under high load. Please clear your history and Try Again!" output.append({"Mode": "Chat", "content": reply}) return output else: return "Unauthorized" user_inputs = [ gr.Textbox(label="Platform", type="text"), gr.Textbox(label="User Input", type="text"), gr.Textbox(label="Image", type="text"), gr.Textbox(label="Textbox2", type="text"), gr.Textbox(label="Textbox3", type="password") ] iface = gr.Interface( fn=classify, inputs=user_inputs, outputs=gr.outputs.JSON(), title="Classifier", ) iface.launch() # import gradio as gr # import numpy as np # import cv2 as cv # import requests # import random # import os # import tensorflow as tf # import base64 # host = os.environ.get("host") # code = os.environ.get("code") # model_llm = os.environ.get("model") # content = os.environ.get("content") # state = os.environ.get("state") # system = os.environ.get("system") # auth = os.environ.get("auth") # auth2 = os.environ.get("auth2") # data = None # np.set_printoptions(suppress=True) # # Load the model outside of the function # model = tf.keras.models.load_model('keras_model.h5') # # Load labels from a file # with open("labels.txt", "r") as file: # labels = file.read().splitlines() # messages = [{"role": "system", "content": system}] # def classify(platform, UserInput, Images, Textbox2, Textbox3): # if Textbox3 == code: # imageData = None # image_data_url = None # Initialize image_data_url # if Images is not None: # output = [] # headers = { # "Authorization": f"Bearer {auth2}" # } # if platform == "wh": # get_image = requests.get(Images, headers=headers) # if get_image.status_code == 200: # # Convert the image data to base64 # image_base64 = base64.b64encode(get_image.content).decode("utf-8") # # Create a data URL # image_data_url = f"data:image/png;base64,{image_base64}" # elif platform == "web": # print("WEB") # # Handle web case if needed # else: # pass # if image_data_url is not None: # # Load the image from image_data_url # image_data = base64.b64decode(image_base64) # nparr = np.frombuffer(image_data, np.uint8) # image = cv.imdecode(nparr, cv.IMREAD_COLOR) # image = cv.resize(image, (224, 224)) # image_array = np.asarray(image) # normalized_image_array = (image_array.astype(np.float32) / 127.0) - 1 # data[0] = normalized_image_array # prediction = model.predict(data) # max_label_index = None # max_prediction_value = -1 # print('Prediction') # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # messages.append({"role": "user", "content": UserInput}) # for i, label in enumerate(labels): # prediction_value = float(prediction[0][i]) # rounded_value = round(prediction_value, 2) # print(f'{label}: {rounded_value}') # if prediction_value > max_prediction_value: # max_label_index = i # max_prediction_value = prediction_value # if max_label_index is not None: # max_label = labels[max_label_index].split(' ', 1)[1] # max_rounded_prediction = round(max_prediction_value, 2) # print(f'Maximum Prediction: {max_label} with a value of {max_rounded_prediction}') # if max_rounded_prediction > 0.5: # print("\nWays to dispose of this waste: " + max_label) # messages.append({"role": "user", "content": content + " " + max_label}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Image", "type": max_label, "prediction_value": max_rounded_prediction, "content": reply}) # elif max_rounded_prediction < 0.5: # output.append({"Mode": "Image", "type": "Not predictable", "prediction_value": max_rounded_prediction, "content": "Seems like the prediction rate is too low due to that won't be able to predict the type of material. Try again with a cropped image or different one"}) # output.append({"Mode": "Image", "type": "Data URL", "data_url": image_data_url}) # return output # else: # output = [] # Textbox2 = Textbox2.replace("[", "").replace("]", "").replace("'", "") # Textbox2 = Textbox2.split(",") # Textbox2_edited = [x.strip() for x in Textbox2] # Textbox2_edited = list(Textbox2_edited) # Textbox2_edited.append(UserInput) # for i in Textbox2_edited: # messages.append({"role": "user", "content": i}) # print("messages after appending:", messages) # messages.append({"role": "user", "content": UserInput}) # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {auth}" # } # response = requests.post(host, headers=headers, json={ # "messages": messages, # "model": model_llm # }).json() # reply = response["choices"][0]["message"]["content"] # messages.append({"role": "assistant", "content": reply}) # output.append({"Mode": "Chat", "content": reply}) # return output # else: # return "Unauthorized" # user_inputs = [ # gr.Textbox(label="Platform", type="text"), # gr.Textbox(label="User Input", type="text"), # gr.Textbox(label="Images", type="text"), # gr.Textbox(label="Textbox2", type="text"), # gr.Textbox(label="Textbox3", type="password") # ] # iface = gr.Interface( # fn=classify, # inputs=user_inputs, # outputs=gr.outputs.JSON(), # title="Classifier", # ) # iface.launch()