testSortModels / app.py
DemiPoto's picture
Update app.py
0a68b0d verified
raw
history blame
13.6 kB
import os
import gradio as gr
from random import randint
from operator import itemgetter
import bisect
from all_models import tags_plus_models,models,models_plus_tags
from datetime import datetime
from externalmod import gr_Interface_load
import asyncio
import os
from threading import RLock
lock = RLock()
HF_TOKEN = os.environ.get("HF_TOKEN") if os.environ.get("HF_TOKEN") else None # If private or gated models aren't used, ENV setting is unnecessary.
now2 = 0
inference_timeout = 300
MAX_SEED = 2**32-1
nb_rep=2
nb_mod_dif=20
nb_models=nb_mod_dif*nb_rep
cache_image={}
cache_image_actu={}
def load_fn(models):
global models_load
global num_models
global default_models
models_load = {}
num_models = len(models)
i=0
if num_models!=0:
default_models = models[:num_models]
else:
default_models = {}
for model in models:
i+=1
if i%50==0:
print("\n\n\n-------"+str(i)+'/'+str(len(models))+"-------\n\n\n")
if model not in models_load.keys():
try:
m = gr_Interface_load(f'models/{model}', hf_token=HF_TOKEN)
except Exception as error:
m = gr.Interface(lambda txt: None, ['text'], ['image'])
print(error)
models_load.update({model: m})
load_fn(models)
tags_plus_models_to_list=[]
for tag_plus_m in tags_plus_models:
tags_plus_models_to_list.append([tag_plus_m[0][2]])
def test_pass(test):
if test==os.getenv('p'):
print("ok")
return gr.Dropdown(label="Lists Tags", show_label=True, choices=list(models_test) , interactive = True)
else:
print("nop")
return gr.Dropdown(label="Lists Tags", show_label=True, choices=list([]) , interactive = True)
def test_pass_aff(test):
if test==os.getenv('p'):
return gr.Accordion( open=True, visible=True) ,gr.Row(visible=False)
else:
return gr.Accordion( open=True, visible=False) , gr.Row()
# https://huggingface.co./docs/api-inference/detailed_parameters
# https://huggingface.co./docs/huggingface_hub/package_reference/inference_client
async def infer(model_str, prompt, nprompt="", height=None, width=None, steps=None, cfg=None, seed=-1, timeout=inference_timeout):
from pathlib import Path
kwargs = {}
if height is not None and height >= 256: kwargs["height"] = height
if width is not None and width >= 256: kwargs["width"] = width
if steps is not None and steps >= 1: kwargs["num_inference_steps"] = steps
if cfg is not None and cfg > 0: cfg = kwargs["guidance_scale"] = cfg
noise = ""
if seed >= 0: kwargs["seed"] = seed
else:
rand = randint(1, 500)
for i in range(rand):
noise += " "
task = asyncio.create_task(asyncio.to_thread(models_load[model_str].fn,
prompt=f'{prompt} {noise}', negative_prompt=nprompt, **kwargs, token=HF_TOKEN))
await asyncio.sleep(0)
try:
result = await asyncio.wait_for(task, timeout=timeout)
except (Exception, asyncio.TimeoutError) as e:
print(e)
print(f"Task timed out: {model_str}")
if not task.done(): task.cancel()
result = None
if task.done() and result is not None:
with lock:
png_path = "image.png"
result.save(png_path)
image = str(Path(png_path).resolve())
return image
return None
def gen_fn(model_str, prompt, nprompt="", height=None, width=None, steps=None, cfg=None, seed=-1):
if model_str == 'NA':
return None
try:
loop = asyncio.new_event_loop()
result = loop.run_until_complete(infer(model_str, prompt, nprompt,
height, width, steps, cfg, seed, inference_timeout))
except (Exception, asyncio.CancelledError) as e:
print(e)
print(f"Task aborted: {model_str}")
result = None
finally:
loop.close()
return result
def add_gallery(image, model_str, gallery):
if gallery is None: gallery = []
#with lock:
if image is not None: gallery.append((image, model_str))
return gallery
def reset_gallery(gallery):
return add_gallery(None,"",[])
def load_gallery(gallery,id):
gallery = reset_gallery(gallery)
for c in cache_image[f"{id}"]:
gallery=add_gallery(c[0],c[1],gallery)
return gallery
def load_gallery_sorted(gallery,id):
gallery = reset_gallery(gallery)
for c in sorted(cache_image[f"{id}"], key=itemgetter(1)):
gallery=add_gallery(c[0],c[1],gallery)
return gallery
def load_gallery_actu(gallery,id):
gallery = reset_gallery(gallery)
for c in cache_image_actu[f"{id}"]:
gallery=add_gallery(c[0],c[1],gallery)
return gallery
def add_cache_image(image, model_str,id,cache_image=cache_image):
if image is not None:
cache_image[f"{id}"].append((image,model_str))
#cache_image=sorted(cache_image, key=itemgetter(1))
return
def add_cache_image_actu(image, model_str,id,cache_image_actu=cache_image_actu):
if image is not None:
bisect.insort(cache_image_actu[f"{id}"],(image, model_str), key=itemgetter(1))
#cache_image_actu=sorted(cache_image_actu, key=itemgetter(1))
return
def reset_cache_image(id,cache_image=cache_image):
cache_image[f"{id}"].clear()
return
def reset_cache_image_actu(id,cache_image_actu=cache_image_actu):
cache_image_actu[f"{id}"].clear()
return
def reset_cache_image_all_sessions(cache_image=cache_image,cache_image_actu=cache_image_actu):
for key, listT in cache_image.items():
listT.clear()
for key, listT in cache_image_actu.items():
listT.clear()
return
def set_session(id):
if id==0:
randTemp=randint(1,MAX_SEED)
cache_image[f"{randTemp}"]=[]
cache_image_actu[f"{randTemp}"]=[]
return gr.Number(visible=False,value=randTemp)
else :
return id
def print_info_sessions():
lenTot=0
print("###################################")
print("number of sessions : "+str(len(cache_image)))
for key, listT in cache_image.items():
print("session "+key+" : "+str(len(listT)))
lenTot+=len(listT)
print("images total = "+str(lenTot))
print("###################################")
return
def disp_models(group_model_choice,nb_rep=nb_rep):
listTemp=[]
strTemp='\n'
i=0
for m in group_model_choice:
if m not in listTemp:
listTemp.append(m)
for m in listTemp:
i+=1
strTemp+="\"" + m + "\",\n"
if i%(8/nb_rep)==0:
strTemp+="\n"
return gr.Textbox(label="models",value=strTemp)
def search_models(str_search,tags_plus_models=tags_plus_models):
output1="\n"
output2=""
for m in tags_plus_models[0][2]:
if m.find(str_search)!=-1:
output1+="\"" + m + "\",\n"
outputPlus="\n From tags : \n\n"
for tag_plus_models in tags_plus_models:
if str_search.lower() == tag_plus_models[0].lower() and str_search!="":
for m in tag_plus_models[2]:
output2+="\"" + m + "\",\n"
if output2 != "":
output=output1+outputPlus+output2
else :
output=output1
return gr.Textbox(label="out",value=output)
def search_info(txt_search_info,models_plus_tags=models_plus_tags):
outputList=[]
if txt_search_info.find("\"")!=-1:
start=txt_search_info.find("\"")+1
end=txt_search_info.find("\"",start)
m_name=cutStrg(txt_search_info,start,end)
else :
m_name = txt_search_info
for m in models_plus_tags:
if m_name == m[0]:
outputList=m[1]
if len(outputList)==0:
outputList.append("Model Not Find")
return gr.Textbox(label="out",value=outputList)
def ratio_chosen(choice_ratio,width,height):
if choice_ratio == [None,None]:
return width , height
else :
return gr.Slider(label="Width", info="If 0, the default value is used.", maximum=2024, step=32, value=choice_ratio[0]), gr.Slider(label="Height", info="If 0, the default value is used.", maximum=2024, step=32, value=choice_ratio[1])
list_ratios=[["None",[None,None]],
["4:1 (2048 x 512)",[2048,512]],
["12:5 (1536 x 640)",[1536,640]],
["~16:9 (1344 x 768)",[1344,768]],
["~3:2 (1216 x 832)",[1216,832]],
["~4:3 (1152 x 896)",[1152,896]],
["1:1 (1024 x 1024)",[1024,1024]],
["~3:4 (896 x 1152)",[896,1152]],
["~2:3 (832 x 1216)",[832,1216]],
["~9:16 (768 x 1344)",[768,1344]],
["5:12 (640 x 1536)",[640,1536]],
["1:4 (512 x 2048)",[512,2048]]]
def fonc_add_param(lp,txt_input,neg_input,width,height,steps,cfg,seed):
if lp == [["","",0,0,0,0,-1]]:
lp.remove(["","",0,0,0,0,-1])
lp.append([txt_input,neg_input,width,height,steps,cfg,seed])
return gr.Dataset(samples=lp) , gr.Dropdown(choices=[["a",lp]], value=lp)
def fonc_del_param(lp,txt_input,neg_input,width,height,steps,cfg,seed):
if [txt_input,neg_input,width,height,steps,cfg,seed] in lp :
lp.remove([txt_input,neg_input,width,height,steps,cfg,seed])
if lp == []:
lp.append(["","",0,0,0,0,-1])
return gr.Dataset(samples=lp) , gr.Dropdown(choices=[["a",lp]], value=lp)
def make_me():
with gr.Column():
with gr.Group():
with gr.Row():
with gr.Column(scale=4):
txt_input = gr.Textbox(label='Your prompt:', lines=4, interactive = True)
neg_input = gr.Textbox(label='Negative prompt:', lines=4, interactive = True)
with gr.Column(scale=4):
with gr.Row():
width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=2024, step=32, value=0, interactive = True)
height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=2024, step=32, value=0, interactive = True)
with gr.Row():
choice_ratio = gr.Dropdown(label="Ratio Width/Height",
info="OverWrite Width and Height (W*H<1024*1024)",
show_label=True, choices=list(list_ratios) , interactive = True, value=list_ratios[0][1])
choice_ratio.change(ratio_chosen,[choice_ratio,width,height],[width,height])
with gr.Row():
steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0, interactive = True)
cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0, interactive = True)
seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1, interactive = True)
add_param=gr.Button("Add to the list")
del_param=gr.Button("Delete to the list")
#gen_button = gr.Button('Generate images', scale=3)
#stop_button = gr.Button('Stop', variant='secondary', interactive=False, scale=1)
#gen_button.click(lambda: gr.update(interactive=True), None, stop_button)
list_param=gr.Dropdown(choices=[["a",[["","",0,0,0,0,-1]]]], value=[["","",0,0,0,0,-1]], visible=False)
disp_param = gr.Examples(
label="list of prompt",
examples=list_param.value,
inputs=[txt_input,neg_input,width,height,steps,cfg,seed],
outputs=[txt_input,neg_input,width,height,steps,cfg,seed],
)
add_param.click(fonc_add_param,[list_param,txt_input,neg_input,width,height,steps,cfg,seed],[disp_param.dataset,list_param])
del_param.click(fonc_del_param,[list_param,txt_input,neg_input,width,height,steps,cfg,seed],[disp_param.dataset,list_param])
with gr.Row():
nb_images_by_prompt=gr.Number(2,label="Number of images by prompt:",interactive=True)
nb_of_models_to_gen=gr.Number(10,label="Number of Models:",interactive=True)
list_models_in_tag=gr.Dropdown(label="Tag",choices=[tags_plus_models_to_list],value=tags_plus_models_to_list[1][1])
index_first_model=gr.Dropdown(label="First model",choices=tags_plus_models_to_list[1][1], type="index")
list_models_in_tag.change(lambda x:gr.Dropdown(choices=x),list_models_in_tag,index_first_model)
with gr.Row():
disp_info=gr.Textbox(label="Info")
load_info=gr.Button(label="load info")
load.info.click(fonc_load_info,[nb_of_models_to_gen,index_first_model],[disp_info])
js_code = """
console.log('ghgh');
"""
with gr.Blocks(theme="Nymbo/Nymbo_Theme", fill_width=True, css="div.float.svelte-1mwvhlq { position: absolute; top: var(--block-label-margin); left: var(--block-label-margin); background: none; border: none;}") as demo:
gr.Markdown("<script>" + js_code + "</script>")
make_me()
# https://www.gradio.app/guides/setting-up-a-demo-for-maximum-performance
#demo.queue(concurrency_count=999) # concurrency_count is deprecated in 4.x
demo.queue(default_concurrency_limit=200, max_size=200)
demo.launch(max_threads=400)