Spaces:
Runtime error
Runtime error
from functools import wraps | |
import io | |
import json | |
import os | |
import random | |
import requests | |
from ..tool import Tool | |
from huggingface_hub.inference_api import InferenceApi | |
import base64 | |
from io import BytesIO | |
import os | |
import random | |
import uuid | |
import requests | |
from PIL import Image, ImageDraw | |
from diffusers.utils import load_image | |
from pydub import AudioSegment | |
from huggingface_hub.inference_api import InferenceApi | |
from huggingface_hub.utils._errors import RepositoryNotFoundError | |
DIRPATH = os.path.dirname(os.path.abspath(__file__)) | |
CONFIG = { | |
"debug": False, | |
"log_file": None, | |
"huggingface": {"token": os.environ.get("HUGGINGFACE_API_KEY")}, | |
"proxy": None, | |
"inference_mode": "huggingface", | |
"local_inference_endpoint": {"host": "localhost", "port": 8005}, | |
"huggingface_inference_endpoint": { | |
"host": "api-inference.huggingface.co", | |
"port": 443, | |
}, | |
} | |
HUGGINGFACE_HEADERS = {} | |
if CONFIG["huggingface"]["token"] and CONFIG["huggingface"]["token"].startswith( | |
"hf_" | |
): # Check for valid huggingface token in config file | |
HUGGINGFACE_HEADERS = { | |
"Authorization": f"Bearer {CONFIG['huggingface']['token']}", | |
} | |
elif "HUGGINGFACE_ACCESS_TOKEN" in os.environ and os.getenv( | |
"HUGGINGFACE_API_KEY" | |
).startswith( | |
"hf_" | |
): # Check for environment variable HUGGINGFACE_ACCESS_TOKEN | |
HUGGINGFACE_HEADERS = { | |
"Authorization": f"Bearer {os.getenv('HUGGINGFACE_API_KEY')}", | |
} | |
else: | |
raise ValueError(f"Incrorrect HuggingFace token. Please check your file.") | |
PROXY = None | |
if CONFIG["proxy"]: | |
PROXY = { | |
"https": CONFIG["proxy"], | |
} | |
INFERENCE_MODE = CONFIG["inference_mode"] | |
DOCS_PATH = "sources/docs.json" | |
INPUT_PATH = "files" | |
OUTPUT_PATH = "files" | |
MODEL_SERVER = None | |
if INFERENCE_MODE != "huggingface": | |
MODEL_SERVER = ( | |
"http://" | |
+ CONFIG["local_inference_endpoint"]["host"] | |
+ ":" | |
+ str(CONFIG["local_inference_endpoint"]["port"]) | |
) | |
message = f"The server of local inference endpoints is not running, please start it first. (or using `inference_mode: huggingface` in for a feature-limited experience)" | |
try: | |
r = requests.get(MODEL_SERVER + "/running") | |
if r.status_code != 200: | |
raise ValueError(message) | |
except: | |
raise ValueError(message) | |
def get_model_status(model_id, url, headers): | |
# endpoint_type = "huggingface" if "huggingface" in url else "local" | |
if "huggingface" in url: | |
r = requests.get(url + f"/{model_id}", headers=headers, proxies=PROXY) | |
else: | |
r = requests.get(url) | |
return r.status_code == 200 and "loaded" in r.json() and r.json()["loaded"] | |
def image_to_bytes(img_url): | |
img_byte = io.BytesIO() | |
load_image(img_url).save(img_byte, format="png") | |
img_data = img_byte.getvalue() | |
return img_data | |
def build_tool(conf) -> Tool: | |
task_list = [] | |
tool = Tool( | |
tool_name="hugging_tools", | |
description="API interface for HuggingGPT-like applications.", | |
name_for_model="hugging_tools", | |
description_for_model="""This API interface provides easy access to popular models available on the Huggingface model hub. You MUST check model_docs to fetch the available models FIRST: | |
Action: model_docs | |
Action Input: {"task" : <task_name>} | |
After that you can choose an available models. """, | |
logo_url="https://your-app-url.com/.well-known/logo.png", | |
contact_email="[email protected]", | |
legal_info_url="[email protected]", | |
) | |
# set the get route to /func.__name__ and format docs | |
def task(func): | |
func.__doc__ = ( | |
"""You MUST check model_docs to fetch the available models FIRST: | |
Action: model_docs | |
Action Input: {"task" : "%s"} | |
After that you can choose an available models in the list. | |
""" | |
% func.__name__ | |
) | |
def try_run_task(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except RepositoryNotFoundError as e: | |
return """The model with model_id you input is not available. Plese check the model_docs to get other available models: | |
Action: model_docs | |
Action Input: {"task" : "%s"} | |
After that you can choose an available models in the list. | |
""" | |
path = "/" + func.__name__ | |
try_run_task.route = path | |
task_list.append(func.__name__) | |
return tool.get(path)(try_run_task) | |
def format_docs(str): | |
def set_docs(func): | |
func.__doc__ = func.__doc__ % str | |
def original_func(*args, **kwargs): | |
return func(*args, **kwargs) | |
return original_func | |
return set_docs | |
def question_answering(model_id: str, question: str, context: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str( | |
inference({"question": question, "context": (context if context else "")}) | |
) | |
def sentence_similarity(model_id: str, text: str, context: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str( | |
inference( | |
{"source_sentence": text, "sentences": [(context if context else "")]} | |
) | |
) | |
def text_classification(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
def token_classification(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
def text2text_generation(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
def summarization(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
def translation(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
def conversational( | |
model_id: str, text: str, past_user_inputs: str, generated_responses: str | |
) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
input = { | |
"past_user_inputs": [past_user_inputs], | |
"generated_responses": [generated_responses], | |
"text": text, | |
} | |
return str(inference(input)) | |
def text_generation(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
return str(inference(text)) | |
# CV tasks | |
def visual_question_answering( | |
model_id: str, image_file_name: str, text: str | |
) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
img_base64 = base64.b64encode(img_data).decode("utf-8") | |
return str(inference({"question": text, "image": img_base64})) | |
def document_question_answering( | |
model_id: str, image_file_name: str, text: str | |
) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
img_base64 = base64.b64encode(img_data).decode("utf-8") | |
return str(inference({"question": text, "image": img_base64})) | |
def image_to_image(model_id: str, image_file_name: str) -> str: | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
# result = inference(data=img_data) # not support | |
HUGGINGFACE_HEADERS["Content-Length"] = str(len(img_data)) | |
task_url = f"https://api-inference.huggingface.co/models/{model_id}" | |
r = requests.post(task_url, headers=HUGGINGFACE_HEADERS, data=img_data) | |
name = str(uuid.uuid4())[:4] | |
result = f"{name}.jpeg" | |
with open(f"{DIRPATH}/{OUTPUT_PATH}/{result}", "wb") as f: | |
f.write(r.content) | |
return result | |
def text_to_image(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img = inference(text) | |
name = str(uuid.uuid4())[:4] | |
print(img.format) | |
image_type = "jpg" if img.format == "JPEG" else "png" | |
img.save(f"{DIRPATH}/{OUTPUT_PATH}/{name}.{image_type}") | |
result = f"{name}.{image_type}" | |
return result | |
def image_segmentation(model_id: str, image_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
image = Image.open(BytesIO(img_data)) | |
predicted = inference(data=img_data) | |
colors = [] | |
for i in range(len(predicted)): | |
colors.append( | |
( | |
random.randint(100, 255), | |
random.randint(100, 255), | |
random.randint(100, 255), | |
155, | |
) | |
) | |
for i, pred in enumerate(predicted): | |
mask = pred.pop("mask").encode("utf-8") | |
mask = base64.b64decode(mask) | |
mask = Image.open(BytesIO(mask), mode="r") | |
mask = mask.convert("L") | |
layer = Image.new("RGBA", mask.size, colors[i]) | |
image.paste(layer, (0, 0), mask) | |
name = str(uuid.uuid4())[:4] | |
image.save(f"{DIRPATH}/{OUTPUT_PATH}/{name}.jpg") | |
result = {} | |
result["generated image"] = f"{name}.jpg" | |
result["predicted"] = predicted | |
return str(result) | |
def object_detection(model_id: str, image_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
predicted = inference(data=img_data) | |
image = Image.open(BytesIO(img_data)) | |
draw = ImageDraw.Draw(image) | |
labels = list(item["label"] for item in predicted) | |
color_map = {} | |
for label in labels: | |
if label not in color_map: | |
color_map[label] = ( | |
random.randint(0, 255), | |
random.randint(0, 100), | |
random.randint(0, 255), | |
) | |
for label in predicted: | |
box = label["box"] | |
draw.rectangle( | |
((box["xmin"], box["ymin"]), (box["xmax"], box["ymax"])), | |
outline=color_map[label["label"]], | |
width=2, | |
) | |
draw.text( | |
(box["xmin"] + 5, box["ymin"] - 15), | |
label["label"], | |
fill=color_map[label["label"]], | |
) | |
name = str(uuid.uuid4())[:4] | |
image.save(f"{DIRPATH}/{OUTPUT_PATH}/{name}.jpg") | |
result = {} | |
result["generated image"] = f"{name}.jpg" | |
result["predicted"] = predicted | |
return str(result) | |
def image_classification(model_id: str, image_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
result = inference(data=img_data) | |
return str(result) | |
def image_to_text(model_id: str, image_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
img_data = image_to_bytes(f"{DIRPATH}/{INPUT_PATH}/{image_file_name}") | |
result = inference(data=img_data) | |
return str(result) | |
# AUDIO tasks | |
def text_to_speech(model_id: str, text: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
response = inference(text, raw_response=True) | |
name = str(uuid.uuid4())[:4] | |
with open(f"{DIRPATH}/{OUTPUT_PATH}/{name}.flac", "wb") as f: | |
f.write(response.content) | |
result = f"{name}.flac" | |
return result | |
def automatic_speech_recognition(model_id: str, audio_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
with open(f"{DIRPATH}/{INPUT_PATH}/{audio_file_name}", "rb") as f: | |
audio = f.read() | |
text = inference(data=audio, raw_response=True) | |
result = text.content | |
return str(result) | |
def audio_to_audio(model_id: str, audio_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
with open(f"{DIRPATH}/{INPUT_PATH}/{audio_file_name}", "rb") as f: | |
audio = f.read() | |
response = inference(data=audio, raw_response=True) | |
result = response.json() | |
content = None | |
type = None | |
for k, v in result[0].items(): | |
if k == "blob": | |
content = base64.b64decode(v.encode("utf-8")) | |
if k == "content-type": | |
type = "audio/flac".split("/")[-1] | |
audio = AudioSegment.from_file(BytesIO(content)) | |
name = str(uuid.uuid4())[:4] | |
audio.export(f"{DIRPATH}/{OUTPUT_PATH}/{name}.{type}", format=type) | |
result = f"{name}.{type}" | |
return result | |
def audio_classification(model_id: str, audio_file_name: str) -> str: | |
inference = InferenceApi(repo_id=model_id, token=CONFIG["huggingface"]["token"]) | |
with open(f"{DIRPATH}/{INPUT_PATH}/{audio_file_name}", "rb") as f: | |
audio = f.read() | |
response = inference(data=audio, raw_response=True) | |
result = response.json() | |
return str(result) | |
def model_docs(task: str) -> str: | |
"""returns a document about the usage, examples, and available models of existing API. | |
Every time before you use the API, you should get the document of EXISTING API ONLY and ONLY use the available models in the document. | |
task: the name of API, includes %s | |
return the document of the API. | |
example: | |
Action: model_docs | |
Action Input: {"task" : "question_answering"} | |
Observation: "question_answering is a function that uses a pre-trained language model to answer questions based on a given context.\n You can choose one model from: 'distilbert-base-uncased-distilled-squad', 'deepset/minilm-uncased-squad2', 'etalab-ia/camembert-base-squadFR-fquad-piaf'.\n\n Action Input: {\"model_id\" : \"distilbert-base-uncased-distilled-squad\", \"question\" : \"When did the first moon landing occur?\", \"context\" : \"The first manned moon landing was achieved by the United States on July 20, 1969, in the Apollo 11 mission.\"}\n " | |
""" | |
with open(f"{DIRPATH}/{DOCS_PATH}", "r") as f: | |
docs = json.load(f) | |
if task in docs.keys(): | |
return docs[task] | |
else: | |
return "The function doesn't exist. Please input the valid function name." | |
return tool | |