Omnibus's picture
Update dl.py
74613c7
raw
history blame
14.7 kB
import os
import sys
import uuid
import gradio as gr
import requests
import json
from zipfile import ZipFile
import shutil
from pathlib import Path
from huggingface_hub import (create_repo,get_full_repo_name,upload_file,CommitOperationAdd,HfApi,snapshot_download)
from PIL import Image
uid = uuid.uuid4()
#token = os.environ['HF_TOKEN']
#token_self = os.environ['HF_TOKEN']
#o=os.environ['P']
css="""
.wrap.svelte-vm32wk.svelte-vm32wk.svelte-vm32wk {
display:block;
}
.wrap.svelte-1k4wjf2.svelte-1k4wjf2.svelte-1k4wjf2{
display:block;
}
"""
def show_s(name,token):
spaces=[]
spaces.append("")
api = HfApi(token=token)
author=name
s_ist = (api.list_spaces(author=author))
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
#s_info=api.space_info(f'{name}/{space}',files_metadata=True)
#print(s_info)
spaces.append(space_ea)
#print (space_ea)
return(gr.Dropdown.update(label="Spaces", choices=[s for s in spaces]),
gr.update(choices=[s for s in spaces],interactive=True),
gr.update(choices=[s for s in spaces],interactive=True))
def show_f(repo,name,token):
api = HfApi(token=token)
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space"))
print (f_ist)
file_list = []
#file_out = []
if not os.path.exists(name):
os.makedirs(name)
for d_app in f_ist:
if "/" in d_app:
dir_1=d_app.split("/",1)[0]
rem_1=d_app.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}'):
os.makedirs(f'{name}/{dir_1}')
if "/" in rem_1:
dir_2=rem_1.split("/",1)[0]
rem_2=rem_1.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}/{dir_2}'):
os.makedirs(f'{name}/{dir_1}/{dir_2}')
sf=rem_2.split(".",1)[1]
pf=rem_2.split(".",1)[0]
f_name=f'{dir_1}/{dir2}/{pf}.{sf}'
else:
sf=rem_1.split(".",1)[1]
pf=rem_1.split(".",1)[0]
f_name=f'{dir_1}/{pf}.{sf}'
print(f_name)
else:
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
pass
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{d_app}')
print(d_app)
#print (r.text)
uid = uuid.uuid4()
file = open(f'{name}/{f_name}','wb')
file.write(r.content)
file.close()
file_list.append(Path(f'{name}/{f_name}'))
with ZipFile(f"{name}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
zipObj.write(f'{name}/{file}')
file_list.append(f'{name}.zip')
s_info=api.space_info(f'{repo}/{name}',files_metadata=True)
'''
with open(f'{uid}-tmp.json','w') as f:
json.dump(s_info,f,indent=4)
f.close()
'''
return(gr.Dropdown.update(label="Files", choices=[f for f in f_ist],interactive=True),
file_list,
gr.update(choices=[f for f in f_ist],interactive=True),
api.get_space_runtime(f'{repo}/{name}'))
def show_f2(repo,name,token):
api = HfApi(token=token)
#f_ist = snapshot_download(repo_id=f'{repo}/{name}', repo_type="space")
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space"))
print (f_ist)
file_list = []
file_out = []
if not os.path.exists(name):
os.makedirs(name)
for d_app in f_ist:
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{d_app}')
#print (r.text)
uid = uuid.uuid4()
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
file = open(f'{name}/{f_name}','w')
file.writelines(r.text)
file.close()
file_list.append(Path(f'{name}/{f_name}'))
file_out.append(d_app)
with ZipFile(f"{name}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
zipObj.write(f'{name}/{file}')
file_list.append(f'{name}.zip')
return(gr.Dropdown.update(label="Files", choices=[f for f in f_ist],interactive=True), file_list)
def show_f_cont(repo,name,file,token):
html_text = '<html>\n<body>\n<div>\n'
images=[".png" , ".jpg" , ".gif" , ".webm" , ".mp4"]
is_im=False
for x in images:
if x in file:
html_text += f'<object data="https://huggingface.co./spaces/{repo}/{name}/resolve/main/{file}" style="width:100%;font-size:small;"></object>'
out_text = "Image File"
is_im=True
else:
pass
if is_im==False:
print(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}')
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}')
text=r.text
html_text += f'<pre style="text-wrap:pretty;">{text}</pre>\n'
out_text = r.text
else:
pass
html_text += '</div>\n</body>\n</html>'
return(html_text)
def show_f_frame(repo,name,file,token):
file_url=f'https://huggingface.co./spaces/{repo}/{name}/blob/main/{file}'
out=f"""<div><iframe src='{file_url}' height='1200px' width='100%'></iframe>"""
return out
def show_all(author,token):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = (api.list_spaces(author=author))
file_list = []
file_list_ea=[]
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
for d_app in f_ist:
r = requests.get(f'https://huggingface.co./spaces/{author}/{space_ea}/raw/main/{d_app}')
#print (r.text)
uid = uuid.uuid4()
try:
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
file = open(f'{space_ea}/{f_name}','w')
file.writelines(r.text)
file.close()
#file_list_ea.append(Path(f'{space}/{f_name}'))
except Exception:
pass
with ZipFile(f"{space_ea}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
try:
zipObj.write(f'{space_ea}/{file}')
except Exception:
pass
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def show_all_z(author,token):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = (api.list_spaces(author=author))
file_list = []
file_list_ea=[]
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
file= snapshot_download(repo_id=f'{author}/{space_ea}', repo_type="space")
shutil.make_archive(f"{space_ea}", 'zip', file)
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def dl_checked_fn(author,checklist,token):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = checklist
file_list = []
file_list_ea=[]
for i,space_ea in enumerate(s_ist):
#spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
file= snapshot_download(repo_id=f'{author}/{space_ea}', repo_type="space")
shutil.make_archive(f"{space_ea}", 'zip', file)
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def get_space_runtime(author,checklist,token):
api=HfApi(token=token)
space_info=[]
for space in checklist:
outp=api.get_space_runtime(f'{author}/{space}')
space_info.append(outp['stage'])
return space_info
def delete_checked(confirm_val,author,checklist,token):
if confirm_val=="CONFIRM":
api = HfApi(token=token)
s_ist = checklist
delete_list = []
for i,space_ea in enumerate(s_ist):
try:
api.delete_repo(repo_id=f'{author}/{space_ea}',repo_type='space')
delete_list.append(f'Deleted:: {space_ea}\n')
yield delete_list,gr.update(visible=False)
except Exception as e:
yield e
else:
yield "Not Deleting", gr.update(visible=False)
def checkp(p):
if p == o:
return gr.update(visible=False), gr.update(visible=True)
elif p != o:
return None, None
def update_checked_message(inp):
html=""
for ea in inp:
html=f'{html}<br>{ea}'
return html
def ru_sure_fn():
return gr.update(visible=True)
with gr.Blocks(css=css) as build:
with gr.Row(visible=False) as no:
pass_box=gr.Textbox()
pass_btn=gr.Button()
with gr.Box(visible=True) as yes:
with gr.Box():
with gr.Row():
with gr.Column(scale=2):
r_name = gr.Textbox(label="Repo")
token = gr.Textbox(label="auth")
s_btn = gr.Button("Show Spaces")
with gr.Column(scale=1):
files=gr.File(file_count="directory")
with gr.Tab("View 1"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
space_radio=gr.Radio(label="Spaces",choices=[])
with gr.Column(scale=3):
with gr.Tab("Space Info"):
with gr.Row():
with gr.Column(scale=3):
space_info_json=gr.JSON()
gr.Column(scale=1)
with gr.Tab("Files"):
with gr.Column(scale=1):
file_radio=gr.Radio(label="Files",choices=[])
with gr.Column(scale=3):
file_contents=gr.HTML()
with gr.Tab("View 2"):
with gr.Row():
with gr.Column(scale=2):
with gr.Group():
with gr.Row():
s_name = gr.Dropdown(label="Spaces", choices=[])
f_name = gr.Dropdown(label="Files", choices=[])
with gr.Row():
l_btn = gr.Button("Load Space")
with gr.Row():
show_all_btn1 = gr.Button("Load All 1",visible=False)
show_all_btn2 = gr.Button("Load All 2",visible=False)
uu=gr.Textbox(visible=False)
file_frame=gr.HTML()
with gr.Tab("View 3"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
space_check=gr.CheckboxGroup(elem_id="my_checkbox",label="Spaces",choices=[])
with gr.Column(scale=1):
message_box=gr.HTML()
with gr.Column(scale=1):
dl_checked_btn=gr.Button("Download Checked")
delete_checked_btn=gr.Button("Delete Checked")
with gr.Row(visible=False) as sure_row:
with gr.Column():
ru_sure_box=gr.Textbox(label="type: 'CONFIRM' to confirm", value="")
ru_sure_btn=gr.Button("Confirm Delete")
with gr.Row():
del_message_box=gr.HTML()
gr.Column()
sure_check=gr.Textbox(visible=False,value="")
dl_checked_btn.click(dl_checked_fn,[r_name,space_check,token],files)
delete_checked_btn.click(ru_sure_fn,None,sure_row)
ru_sure_btn.click(delete_checked,[ru_sure_box,r_name,space_check,token],[del_message_box,sure_row]).then(show_s,[r_name,token],[s_name,space_radio,space_check])
space_check.change(update_checked_message,space_check,message_box)
show_all_btn1.click(show_all,[r_name,token],files)
show_all_btn2.click(show_all_z,[r_name,token],files)
s_btn.click(show_s,[r_name,token],[s_name,space_radio,space_check])
l_btn.click(show_f,[r_name,s_name,token], [f_name, files,file_radio])
f_name.change(show_f_frame,[r_name,s_name,f_name],[file_frame])
#s_name.change(show_f,[r_name,s_name,token],[d_app,files])
space_radio.change(show_f,[r_name,space_radio,token],[f_name, files,file_radio,space_info_json])
file_radio.change(show_f_cont,[r_name,space_radio,file_radio,token],[file_contents])
pass_btn.click(checkp,pass_box,[no,yes])
#inbut.click(build_space,[token,t_name,s_type,m_type,r_type,d_app,d_css],output_html)
build.queue(concurrency_count=10).launch(show_api=False)
'''
max_textboxes = 10
def variable_outputs(k):
k = int(k)
return [gr.Textbox(visible=True)]*k + [gr.Textbox(visible=False)]*(max_textboxes-k)
with gr.Blocks() as demo:
s = gr.Slider(1, max_textboxes, value=max_textboxes, step=1, label="How many textboxes to show:")
textboxes = []
for i in range(max_textboxes):
t = gr.Textbox(f"Textbox {i}")
textboxes.append(t)
s.change(variable_outputs, s, textboxes)'''