Archive_HF / app.py
Omnibus's picture
Update app.py
d031c4f verified
import os
import sys
import uuid
import gradio as gr
import requests
import json
from zipfile import ZipFile
import shutil
from pathlib import Path
from huggingface_hub import (create_repo,move_repo,get_full_repo_name,upload_file,CommitOperationAdd,HfApi,snapshot_download)
from PIL import Image
uid = uuid.uuid4()
#token = os.environ['HF_TOKEN']
#token_self = os.environ['HF_TOKEN']
#o=os.environ['P']
css="""
label.svelte-1mhtq7j.svelte-1mhtq7j.svelte-1mhtq7j {
width: 100%;
}
"""
def show_s(name,token,save_list):
if not save_list:
save_list=[]
spaces=[]
#spaces.append("")
api = HfApi(token=token)
author=name
s_ist = (api.list_spaces(author=author))
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
#s_info=api.space_info(f'{name}/{space}',files_metadata=True)
#print(s_info)
if space_ea in save_list:
pass
else:
spaces.append(space_ea)
#print (space_ea)
return(gr.update(label="Spaces", choices=[s for s in spaces]),
gr.update(choices=[s for s in spaces],interactive=True))
def show_f(repo,name,token,tog):
api = HfApi(token=token)
if tog:
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space"))
print (f_ist)
file_list = []
#file_out = []
if not os.path.exists(name):
os.makedirs(name)
for d_app in f_ist:
if "/" in d_app:
dir_1=d_app.split("/",1)[0]
rem_1=d_app.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}'):
os.makedirs(f'{name}/{dir_1}')
if "/" in rem_1:
dir_2=rem_1.split("/",1)[0]
rem_2=rem_1.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}/{dir_2}'):
os.makedirs(f'{name}/{dir_1}/{dir_2}')
f_name=f'{dir_1}/{dir2}/{rem_2}'
else:
f_name=f'{dir_1}/{rem_1}'
print(f_name)
else:
f_name=f'{d_app}'
pass
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{d_app}')
print(d_app)
#print (r.text)
uid = uuid.uuid4()
file = open(f'{name}/{f_name}','wb')
#file = open(f'{name}/{f_name}','wb')
file.write(r.content)
file.close()
file_list.append(f'{name}/{f_name}')
with ZipFile(f"{name}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
zipObj.write(f'{name}/{file}')
file_list.append(f'{name}.zip')
s_info=api.space_info(f'{repo}/{name}',files_metadata=True)
'''
with open(f'{uid}-tmp.json','w') as f:
json.dump(s_info,f,indent=4)
f.close()
'''
return(file_list,
gr.update(choices=[f for f in f_ist],interactive=True),
api.get_space_runtime(f'{repo}/{name}'))
else:
return(None,
None,
api.get_space_runtime(f'{repo}/{name}'))
def show_f_cont(repo,name,file,token):
html_text = '<html>\n<body>\n<div>\n'
images=[".png" , ".jpg" , ".gif" , ".webm" , ".mp4"]
is_im=False
for x in images:
if x in file:
html_text += f'<object data="https://huggingface.co./spaces/{repo}/{name}/resolve/main/{file}" style="width:100%;font-size:small;"></object>'
out_text = "Image File"
is_im=True
else:
pass
if is_im==False:
print(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}')
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}')
text=r.text
html_text += f'<pre style="text-wrap:pretty;">{text}</pre>\n'
out_text = r.text
else:
pass
html_text += '</div>\n</body>\n</html>'
return(html_text)
def move(repo,space,dest,token):
from_id=f'{repo}/{space}'
to_id=f'{dest}/{space}'
move_repo(from_id=from_id,to_id=to_id,repo_type='space',token=token)
print("REPO MOVED")
def add_save(space,save_state):
save_state=[]if not save_state else save_state
this = True
for ea in save_state:
if space == ea:
this = False
if this:
#save_list.append(space)
save_state.append(space)
return (save_state,
gr.update(choices=[f for f in save_state],interactive=True),
gr.update(choices=[f for f in save_state],interactive=True))
def rem_save(space,save_state):
save_out=[]
for ea in save_state:
print(space)
print(ea)
if not space == ea:
save_out.append(space)
return (save_out,
gr.update(choices=[f for f in save_out],interactive=True),
gr.update(choices=[f for f in save_out],interactive=True))
def toggle_comments_off(r_name,check_list,token):
api = HfApi(token=token)
for ea in check_list:
api.change_discussion_status(
repo_id=f'{r_name}/{ea}',
new_status='closed',
token=token,
repo_type='space',
)
def set_space_sleep_time(r_name,check_list,sleep_time,token):
api = HfApi(token=token)
for ea in check_list:
api.set_space_sleep_time(
repo_id=f'{r_name}/{ea}',
sleep_time=sleep_time,
token=token,
)
def set_space_visible(r_name,check_list,private_bool,token):
api = HfApi(token=token)
for ea in check_list:
api.update_repo_visibility(
repo_id=f'{r_name}/{ea}',
private=private_bool,
token=token,
)
def load_space_status(r_name,token):
api = HfApi(token=token)
s_ist = (api.list_spaces(author=r_name))
tt=False
#print(dir(s_ist[0]))
out_html="<table><tr><th>Name</th><th>Status</th></tr>"
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
#print(dir(space))
this = api.get_space_runtime(space.id).stage
print(this)
out_html+=f'<tr><td>{space.id}</td><td>{this}</td></tr>'
out_html+="</table>"
return out_html
def rebuild_space(r_name,token):
api = HfApi(token=token)
s_ist = (api.list_spaces(author=r_name))
tt=False
for i,space in enumerate(s_ist):
#space_ea = space.id.split("/",1)[1]
#print(space.id)
if api.get_space_runtime(space.id).stage == "RUNTIME_ERROR":
api.restart_space(
repo_id=f'{space.id}',
token=token,
)
with gr.Blocks(css=css) as build:
save_state=gr.State()
space_list=gr.State([])
with gr.Row(visible=False) as no:
pass_box=gr.Textbox()
pass_btn=gr.Button()
#with gr.Group(visible=True) as yes:
with gr.Group():
with gr.Row():
with gr.Column(scale=2):
r_name = gr.Textbox(label="Repo")
token = gr.Textbox(label="auth")
s_btn = gr.Button("Show Spaces")
with gr.Column(scale=1):
tog=gr.Checkbox(label="Show Files",value=True)
files=gr.File(file_count="directory")
with gr.Group():
with gr.Tab("View 1"):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
space_radio=gr.Radio(label="Spaces",choices=[])
with gr.Column(scale=3):
with gr.Tab("Space Info"):
with gr.Row():
with gr.Column(scale=3):
space_info_json=gr.JSON()
gr.Column(scale=1)
with gr.Tab("Files"):
with gr.Column(scale=1):
file_radio=gr.Radio(label="Files",choices=[])
with gr.Column(scale=3):
file_contents=gr.HTML()
with gr.Tab("Move"):
with gr.Row():
dest=gr.Textbox(label="Destination")
move_btn=gr.Button("Move Space")
with gr.Row():
with gr.Column():
save_list=gr.Radio(label="Protected",choices=[])
with gr.Column():
save_btn=gr.Button("Protect")
rem_btn=gr.Button("Remove")
with gr.Tab("Controls"):
with gr.Row():
with gr.Column():
check_list=gr.CheckboxGroup(choices=[])
com_off = gr.Button("Turn OFF Commenting")
com_on = gr.Button("Turn ON Commenting")
with gr.Tab("All"):
with gr.Row():
with gr.Column():
load_spaces=gr.Button("Load Spaces")
with gr.Column():
refresh_btn=gr.Button("Rebuild")
space_status=gr.HTML()
load_spaces.click(load_space_status, [r_name,token],space_status)
refresh_btn.click(rebuild_space, [r_name,token]).then(load_space_status, [r_name,token],space_status)
com_off.click(toggle_comments_off, [r_name,check_list,token])
save_btn.click(add_save,[space_radio,save_state],[save_state,save_list]).then(show_s,[r_name,token,save_state],[space_radio,check_list])
rem_btn.click(rem_save,[space_radio,save_state],[save_state,save_list]).then(show_s,[r_name,token,save_state],[space_radio,check_list])
move_btn.click(move,[r_name,space_radio,dest,token]).then(show_s,[r_name,token,save_state],[space_radio,check_list])
s_btn.click(show_s,[r_name,token,save_list],[space_radio,check_list])
space_radio.change(show_f,[r_name,space_radio,token,tog],[files,file_radio,space_info_json])
file_radio.change(show_f_cont,[r_name,space_radio,file_radio,token],[file_contents])
build.queue(default_concurrency_limit=10).launch(show_api=False)