Spaces:
Running
Running
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
""" | |
""" | |
import argparse | |
import httpx | |
import importlib | |
import json | |
import logging | |
import os | |
import platform | |
import shutil | |
import time | |
from typing import List, Tuple | |
logging.basicConfig( | |
level=logging.INFO if platform.system() == "Windows" else logging.DEBUG, | |
format="%(asctime)s %(levelname)s %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
import gradio as gr | |
import openai | |
from openai import OpenAI | |
import project_settings as settings | |
from project_settings import project_path | |
logger = logging.getLogger(__name__) | |
def get_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--examples_json_file", | |
default="examples.json", | |
type=str | |
) | |
parser.add_argument( | |
"--description_md_file", | |
default="description.md", | |
type=str | |
) | |
parser.add_argument( | |
"--openai_api_key", | |
default=settings.environment.get("openai_api_key", default=None, dtype=str), | |
type=str | |
) | |
args = parser.parse_args() | |
return args | |
def dynamic_import_function(package_name: str, function_name: str): | |
try: | |
lib = importlib.import_module("functions.{}".format(package_name)) | |
except ModuleNotFoundError as e: | |
raise e | |
function = getattr(lib, function_name) | |
# del lib | |
return function | |
def click_create_assistant(openai_api_key: str, | |
name: str, | |
instructions: str, | |
description: str, | |
tools: str, | |
files: List[str], | |
file_ids: str, | |
model: str, | |
): | |
logger.info("click create assistant, name: {}".format(name)) | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
# tools | |
tools = str(tools).strip() | |
if tools is not None and len(tools) != 0: | |
tools = tools.split("\n") | |
tools = [json.loads(tool) for tool in tools if len(tool.strip()) != 0] | |
else: | |
tools = list() | |
# files | |
if files is not None and len(files) != 0: | |
files = [ | |
client.files.create( | |
file=open(file, "rb"), | |
purpose='assistants' | |
) for file in files | |
] | |
else: | |
files = list() | |
# file_ids | |
file_ids = str(file_ids).strip() | |
if file_ids is not None and len(file_ids) != 0: | |
file_ids = file_ids.split("\n") | |
file_ids = [file_id.strip() for file_id in file_ids if len(file_id.strip()) != 0] | |
else: | |
file_ids = list() | |
# assistant | |
assistant = client.beta.assistants.create( | |
name=name, | |
instructions=instructions, | |
description=description, | |
tools=tools, | |
file_ids=file_ids + [file.id for file in files], | |
model=model, | |
) | |
assistant_id = assistant.id | |
return assistant_id, None | |
def click_list_assistant(openai_api_key: str) -> str: | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
assistant_list = client.beta.assistants.list() | |
assistant_list = assistant_list.model_dump(mode="json") | |
result = "" | |
for a in assistant_list["data"]: | |
assis = "id: \n{}\nname: \n{}\ndescription: \n{}\n\n".format(a["id"], a["name"], a["description"]) | |
result += assis | |
return result | |
def click_delete_assistant(openai_api_key: str, | |
assistant_id: str) -> str: | |
assistant_id = assistant_id.strip() | |
logger.info("click delete assistant, assistant_id: {}".format(assistant_id)) | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
try: | |
assistant_deleted = client.beta.assistants.delete(assistant_id=assistant_id) | |
result = "success" if assistant_deleted.deleted else "failed" | |
except openai.NotFoundError as e: | |
result = e.message | |
return result | |
def click_delete_all_assistant(openai_api_key: str): | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
assistant_list = client.beta.assistants.list() | |
for a in assistant_list.data: | |
client.beta.assistants.delete(a.id) | |
return None | |
def click_list_file(openai_api_key: str): | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
file_list = client.files.list() | |
file_list = file_list.model_dump(mode="json") | |
result = "" | |
for f in file_list["data"]: | |
file = "id: \n{}\nfilename: \n{}\nbytes: \n{}\nstatus: \n{}\n\n".format( | |
f["id"], f["filename"], f["bytes"], f["status"] | |
) | |
result += file | |
return result | |
def click_delete_file(openai_api_key: str, | |
file_id: str) -> str: | |
file_id = file_id.strip() | |
logger.info("click delete file, file_id: {}".format(file_id)) | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
try: | |
assistant_deleted = client.files.delete(file_id=file_id) | |
result = "success" if assistant_deleted.deleted else "failed" | |
except openai.NotFoundError as e: | |
result = e.message | |
except httpx.InvalidURL as e: | |
result = str(e) | |
return result | |
def click_upload_files(openai_api_key: str, | |
files: List[str], | |
): | |
logger.info("click upload files, files: {}".format(files)) | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
result = list() | |
if files is not None and len(files) != 0: | |
files = [ | |
client.files.create( | |
file=open(file, "rb"), | |
purpose='assistants' | |
) for file in files | |
] | |
file_ids = [file.id for file in files] | |
result.extend(file_ids) | |
return result | |
def click_list_function_python_script(): | |
function_script_dir = project_path / "functions" | |
result = "" | |
for script in function_script_dir.glob("*.py"): | |
if script.name == "__init__.py": | |
continue | |
result += script.name | |
result += "\n" | |
return result | |
def click_upload_function_python_script(files: List[str]): | |
tgt = project_path / "functions" | |
if files is None: | |
return None | |
for file in files: | |
shutil.copy(file, tgt.as_posix()) | |
return None | |
def click_delete_function_python_script(filename: str): | |
function_script_dir = project_path / "functions" | |
filename = function_script_dir / filename.strip() | |
filename = filename.as_posix() | |
try: | |
os.remove(filename) | |
result = "success" | |
except FileNotFoundError as e: | |
result = str(e) | |
except Exception as e: | |
result = str(e) | |
return result | |
def click_download_function_python_script(name: str): | |
function_script_dir = project_path / "functions" | |
filename = function_script_dir / name.strip() | |
if not filename.exists(): | |
files = None | |
flag = "File Not Found: {}".format(name.strip()) | |
else: | |
files = [filename.as_posix()] | |
flag = "You can download it on `upload_python_script_files` now." | |
return files, flag | |
def convert_message_list_to_conversation(message_list: List[dict]) -> List[Tuple[str, str]]: | |
conversation = list() | |
for message in message_list: | |
role = message["role"] | |
content = message["content"] | |
for c in content: | |
c_type = c["type"] | |
if c_type != "text": | |
continue | |
text: dict = c["text"] | |
if c_type == "text": | |
text_value = text["value"] | |
text_annotations = text["annotations"] | |
msg = text_value | |
for text_annotation in text_annotations: | |
a_type = text_annotation["type"] | |
if a_type == "file_citation": | |
msg += "\n\n" | |
msg += "\nquote: \n{}\nfile_id: \n{}".format( | |
text_annotation["file_citation"]["quote"], | |
text_annotation["file_citation"]["file_id"], | |
) | |
else: | |
raise NotImplementedError | |
if role == "assistant": | |
msg = [None, msg] | |
else: | |
msg = [msg, None] | |
conversation.append(msg) | |
return conversation | |
def refresh(openai_api_key: str, | |
thread_id: str, | |
): | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
message_list = client.beta.threads.messages.list( | |
thread_id=thread_id | |
) | |
message_list = message_list.model_dump(mode="json") | |
message_list = message_list["data"] | |
message_list = list(sorted(message_list, key=lambda x: x["created_at"])) | |
logger.debug("message_list: {}".format(message_list)) | |
conversation = convert_message_list_to_conversation(message_list) | |
return conversation | |
def add_and_run(openai_api_key: str, | |
assistant_id: str, | |
thread_id: str, | |
name: str, | |
instructions: str, | |
description: str, | |
tools: str, | |
files: List[str], | |
file_ids: str, | |
model: str, | |
query: str, | |
): | |
client = OpenAI( | |
api_key=openai_api_key, | |
) | |
if assistant_id is None or len(assistant_id.strip()) == 0: | |
assistant_id, _ = click_create_assistant( | |
openai_api_key, | |
name, instructions, description, tools, files, file_ids, model | |
) | |
if thread_id is None or len(thread_id.strip()) == 0: | |
thread = client.beta.threads.create() | |
thread_id = thread.id | |
logger.info(f"assistant_id: {assistant_id}, thread_id: {thread_id}") | |
message = client.beta.threads.messages.create( | |
thread_id=thread_id, | |
role="user", | |
content=query | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=thread_id, | |
assistant_id=assistant_id, | |
) | |
delta_time = 0.1 | |
last_conversation = None | |
no_updates_count = 0 | |
max_no_updates_count = 5 | |
while True: | |
time.sleep(delta_time) | |
run = client.beta.threads.runs.retrieve( | |
thread_id=thread_id, | |
run_id=run.id, | |
) | |
# required action | |
if run.required_action is not None: | |
if run.required_action.type == "submit_tool_outputs": | |
tool_outputs = list() | |
for tool_call in run.required_action.submit_tool_outputs.tool_calls: | |
function_name = tool_call.function.name | |
function_to_call = dynamic_import_function(function_name, function_name) | |
kwargs_required: List[str] = dynamic_import_function(function_name, "kwargs")() | |
function_args = json.loads(tool_call.function.arguments) | |
kwargs = {k: function_args.get(k) for k in kwargs_required} | |
function_response = function_to_call(**kwargs) | |
tool_outputs.append({ | |
"tool_call_id": tool_call.id, | |
"output": function_response, | |
}) | |
run = client.beta.threads.runs.submit_tool_outputs( | |
thread_id=thread_id, | |
run_id=run.id, | |
tool_outputs=tool_outputs | |
) | |
# get message | |
conversation = refresh(openai_api_key, thread_id) | |
if conversation == last_conversation: | |
if any([run.completed_at is not None, | |
run.cancelled_at is not None, | |
run.failed_at is not None, | |
run.expires_at is not None]): | |
no_updates_count += 1 | |
if no_updates_count >= max_no_updates_count: | |
break | |
last_conversation = conversation | |
result = [ | |
assistant_id, thread_id, | |
conversation, | |
None | |
] | |
yield result | |
def main(): | |
args = get_args() | |
brief_description = """ | |
## محامون كويتيون | |
""" | |
with open(args.description_md_file, "r", encoding="utf-8") as f: | |
md_description = f.read() | |
# example json | |
with open(args.examples_json_file, "r", encoding="utf-8") as f: | |
examples = json.load(f) | |
for example in examples: | |
files: List[str] = example[4] | |
if files is None: | |
continue | |
files = [(project_path / file).as_posix() for file in files] | |
example[4] = files | |
# ui | |
with gr.Blocks() as blocks: | |
gr.Markdown(value=brief_description) | |
with gr.Row(): | |
# settings | |
with gr.Column(scale=3): | |
with gr.Tabs(): | |
with gr.TabItem("create assistant"): | |
openai_api_key = gr.Text( | |
value=args.openai_api_key, | |
label="openai_api_key", | |
placeholder="Fill with your `openai_api_key`" | |
) | |
name = gr.Textbox(label="name") | |
instructions = gr.Textbox(label="instructions") | |
description = gr.Textbox(label="description") | |
model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model") | |
# functions | |
tools = gr.TextArea(label="functions",visible=False) | |
# upload files | |
retrieval_files = gr.Files(label="retrieval_files") | |
retrieval_file_ids = gr.TextArea(label="retrieval_file_ids") | |
# create assistant | |
create_assistant_button = gr.Button("create assistant", variant="secondary") | |
with gr.TabItem("assistants"): | |
list_assistant_button = gr.Button("list assistant") | |
assistant_list = gr.TextArea(label="assistant_list") | |
delete_assistant_id = gr.Textbox(max_lines=1, label="delete_assistant_id",visible=False) | |
delete_assistant_button = gr.Button("delete assistant",visible=False) | |
delete_all_assistant_button = gr.Button("delete all assistant",visible=False) | |
# this is for files . | |
list_file_button = gr.Button("list file",visible=False) | |
file_list = gr.TextArea(label="file_list",visible=False) | |
upload_files = gr.Files(label="upload_files",visible=False) | |
upload_files_button = gr.Button("upload file",visible=False) | |
delete_file_id = gr.Textbox(max_lines=1, label="delete_file_id",visible=False) | |
delete_file_button = gr.Button("delete file",visible=False) | |
# this is for function script. | |
list_function_python_script_button = gr.Button("list python script",visible=False) | |
list_function_python_script_list = gr.TextArea(label="python_script_list",visible=False) | |
upload_function_python_script_files = gr.Files(label="upload_python_script_files",visible=False) | |
upload_function_python_script_button = gr.Button("upload python script",visible=False) | |
function_python_script_file = gr.Textbox(max_lines=1, label="python_script_file",visible=False) | |
delete_function_python_script_button = gr.Button("delete python script",visible=False) | |
download_function_python_script_button = gr.Button("download python script",visible=False) | |
# with gr.TabItem("files"): | |
# with gr.TabItem("function script"): | |
# chat | |
with gr.Column(scale=5): | |
chat_bot = gr.Chatbot(label="conversation", height=600) | |
query = gr.Textbox(lines=1, label="query") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
add_and_run_button = gr.Button("Add and run", variant="primary") | |
with gr.Column(scale=1): | |
refresh_button = gr.Button("Refresh") | |
# states | |
with gr.Column(scale=2): | |
assistant_id = gr.Textbox(value=None, label="assistant_id") | |
thread_id = gr.Textbox(value=None, label="thread_id") | |
tips = gr.TextArea(value=None, label="tips") | |
# examples | |
with gr.Row(): | |
gr.Examples( | |
examples=examples, | |
inputs=[ | |
name, instructions, description, tools, retrieval_files, model, | |
query, tips | |
], | |
examples_per_page=5 | |
) | |
gr.Markdown(value=md_description) | |
# create assistant | |
create_assistant_button.click( | |
click_create_assistant, | |
inputs=[ | |
openai_api_key, | |
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
], | |
outputs=[ | |
assistant_id, thread_id | |
] | |
) | |
# list assistant | |
list_assistant_button.click( | |
click_list_assistant, | |
inputs=[ | |
openai_api_key | |
], | |
outputs=[ | |
assistant_list | |
] | |
) | |
# delete assistant button | |
delete_assistant_button.click( | |
click_delete_assistant, | |
inputs=[ | |
openai_api_key, | |
delete_assistant_id | |
], | |
outputs=[ | |
delete_assistant_id | |
] | |
) | |
# delete all assistant | |
delete_all_assistant_button.click( | |
click_delete_all_assistant, | |
inputs=[ | |
openai_api_key | |
], | |
outputs=[ | |
file_list | |
] | |
) | |
# list file | |
list_file_button.click( | |
click_list_file, | |
inputs=[ | |
openai_api_key | |
], | |
outputs=[ | |
file_list | |
], | |
) | |
# delete file | |
delete_file_button.click( | |
click_delete_file, | |
inputs=[ | |
openai_api_key, | |
delete_file_id | |
], | |
outputs=[ | |
delete_file_id | |
] | |
) | |
# upload files | |
upload_files_button.click( | |
click_upload_files, | |
inputs=[ | |
openai_api_key, | |
upload_files | |
], | |
outputs=[ | |
] | |
) | |
# list python script | |
list_function_python_script_button.click( | |
click_list_function_python_script, | |
inputs=[], | |
outputs=[ | |
list_function_python_script_list | |
] | |
) | |
# upload function python script | |
upload_function_python_script_button.click( | |
click_upload_function_python_script, | |
inputs=[ | |
upload_function_python_script_files | |
], | |
outputs=[ | |
upload_function_python_script_files | |
] | |
) | |
# delete function python script | |
delete_function_python_script_button.click( | |
click_delete_function_python_script, | |
inputs=[ | |
function_python_script_file | |
], | |
outputs=[ | |
function_python_script_file | |
] | |
) | |
# download function python script | |
download_function_python_script_button.click( | |
click_download_function_python_script, | |
inputs=[ | |
function_python_script_file | |
], | |
outputs=[ | |
upload_function_python_script_files, function_python_script_file | |
] | |
) | |
# query submit | |
query.submit( | |
add_and_run, | |
inputs=[ | |
openai_api_key, | |
assistant_id, thread_id, | |
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
query, | |
], | |
outputs=[ | |
assistant_id, thread_id, | |
chat_bot, | |
query | |
], | |
api_name="query_submit", | |
) | |
# add and run | |
add_and_run_button.click( | |
add_and_run, | |
inputs=[ | |
openai_api_key, | |
assistant_id, thread_id, | |
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model, | |
query, | |
], | |
outputs=[ | |
assistant_id, thread_id, | |
chat_bot, | |
query | |
], | |
) | |
# refresh | |
refresh_button.click( | |
refresh, | |
inputs=[ | |
openai_api_key, | |
thread_id, | |
], | |
outputs=[ | |
chat_bot | |
] | |
) | |
blocks.queue().launch() | |
return | |
if __name__ == '__main__': | |
main() | |