Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import uuid | |
import gradio as gr | |
import requests | |
import json | |
from zipfile import ZipFile | |
import shutil | |
from pathlib import Path | |
from huggingface_hub import (create_repo,move_repo,get_full_repo_name,upload_file,CommitOperationAdd,HfApi,snapshot_download) | |
from PIL import Image | |
uid = uuid.uuid4() | |
#token = os.environ['HF_TOKEN'] | |
#token_self = os.environ['HF_TOKEN'] | |
#o=os.environ['P'] | |
css=""" | |
label.svelte-1mhtq7j.svelte-1mhtq7j.svelte-1mhtq7j { | |
width: 100%; | |
} | |
""" | |
def show_s(name,token,save_list): | |
if not save_list: | |
save_list=[] | |
spaces=[] | |
#spaces.append("") | |
api = HfApi(token=token) | |
author=name | |
s_ist = (api.list_spaces(author=author)) | |
for i,space in enumerate(s_ist): | |
space_ea = space.id.split("/",1)[1] | |
#s_info=api.space_info(f'{name}/{space}',files_metadata=True) | |
#print(s_info) | |
if space_ea in save_list: | |
pass | |
else: | |
spaces.append(space_ea) | |
#print (space_ea) | |
return(gr.update(label="Spaces", choices=[s for s in spaces]), | |
gr.update(choices=[s for s in spaces],interactive=True)) | |
def show_f(repo,name,token,tog): | |
api = HfApi(token=token) | |
if tog: | |
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space")) | |
print (f_ist) | |
file_list = [] | |
#file_out = [] | |
if not os.path.exists(name): | |
os.makedirs(name) | |
for d_app in f_ist: | |
if "/" in d_app: | |
dir_1=d_app.split("/",1)[0] | |
rem_1=d_app.split("/",1)[1] | |
if not os.path.exists(f'{name}/{dir_1}'): | |
os.makedirs(f'{name}/{dir_1}') | |
if "/" in rem_1: | |
dir_2=rem_1.split("/",1)[0] | |
rem_2=rem_1.split("/",1)[1] | |
if not os.path.exists(f'{name}/{dir_1}/{dir_2}'): | |
os.makedirs(f'{name}/{dir_1}/{dir_2}') | |
f_name=f'{dir_1}/{dir2}/{rem_2}' | |
else: | |
f_name=f'{dir_1}/{rem_1}' | |
print(f_name) | |
else: | |
f_name=f'{d_app}' | |
pass | |
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{d_app}') | |
print(d_app) | |
#print (r.text) | |
uid = uuid.uuid4() | |
file = open(f'{name}/{f_name}','wb') | |
#file = open(f'{name}/{f_name}','wb') | |
file.write(r.content) | |
file.close() | |
file_list.append(f'{name}/{f_name}') | |
with ZipFile(f"{name}.zip", "w") as zipObj: | |
for idx, file in enumerate(f_ist): | |
zipObj.write(f'{name}/{file}') | |
file_list.append(f'{name}.zip') | |
s_info=api.space_info(f'{repo}/{name}',files_metadata=True) | |
''' | |
with open(f'{uid}-tmp.json','w') as f: | |
json.dump(s_info,f,indent=4) | |
f.close() | |
''' | |
return(file_list, | |
gr.update(choices=[f for f in f_ist],interactive=True), | |
api.get_space_runtime(f'{repo}/{name}')) | |
else: | |
return(None, | |
None, | |
api.get_space_runtime(f'{repo}/{name}')) | |
def show_f_cont(repo,name,file,token): | |
html_text = '<html>\n<body>\n<div>\n' | |
images=[".png" , ".jpg" , ".gif" , ".webm" , ".mp4"] | |
is_im=False | |
for x in images: | |
if x in file: | |
html_text += f'<object data="https://huggingface.co./spaces/{repo}/{name}/resolve/main/{file}" style="width:100%;font-size:small;"></object>' | |
out_text = "Image File" | |
is_im=True | |
else: | |
pass | |
if is_im==False: | |
print(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}') | |
r = requests.get(f'https://huggingface.co./spaces/{repo}/{name}/raw/main/{file}') | |
text=r.text | |
html_text += f'<pre style="text-wrap:pretty;">{text}</pre>\n' | |
out_text = r.text | |
else: | |
pass | |
html_text += '</div>\n</body>\n</html>' | |
return(html_text) | |
def move(repo,space,dest,token): | |
from_id=f'{repo}/{space}' | |
to_id=f'{dest}/{space}' | |
move_repo(from_id=from_id,to_id=to_id,repo_type='space',token=token) | |
print("REPO MOVED") | |
def add_save(space,save_state): | |
save_state=[]if not save_state else save_state | |
this = True | |
for ea in save_state: | |
if space == ea: | |
this = False | |
if this: | |
#save_list.append(space) | |
save_state.append(space) | |
return (save_state, | |
gr.update(choices=[f for f in save_state],interactive=True), | |
gr.update(choices=[f for f in save_state],interactive=True)) | |
def rem_save(space,save_state): | |
save_out=[] | |
for ea in save_state: | |
print(space) | |
print(ea) | |
if not space == ea: | |
save_out.append(space) | |
return (save_out, | |
gr.update(choices=[f for f in save_out],interactive=True), | |
gr.update(choices=[f for f in save_out],interactive=True)) | |
def toggle_comments_off(r_name,check_list,token): | |
api = HfApi(token=token) | |
for ea in check_list: | |
api.change_discussion_status( | |
repo_id=f'{r_name}/{ea}', | |
new_status='closed', | |
token=token, | |
repo_type='space', | |
) | |
def set_space_sleep_time(r_name,check_list,sleep_time,token): | |
api = HfApi(token=token) | |
for ea in check_list: | |
api.set_space_sleep_time( | |
repo_id=f'{r_name}/{ea}', | |
sleep_time=sleep_time, | |
token=token, | |
) | |
def set_space_visible(r_name,check_list,private_bool,token): | |
api = HfApi(token=token) | |
for ea in check_list: | |
api.update_repo_visibility( | |
repo_id=f'{r_name}/{ea}', | |
private=private_bool, | |
token=token, | |
) | |
def load_space_status(r_name,token): | |
api = HfApi(token=token) | |
s_ist = (api.list_spaces(author=r_name)) | |
tt=False | |
#print(dir(s_ist[0])) | |
out_html="<table><tr><th>Name</th><th>Status</th></tr>" | |
for i,space in enumerate(s_ist): | |
space_ea = space.id.split("/",1)[1] | |
#print(dir(space)) | |
this = api.get_space_runtime(space.id).stage | |
print(this) | |
out_html+=f'<tr><td>{space.id}</td><td>{this}</td></tr>' | |
out_html+="</table>" | |
return out_html | |
def rebuild_space(r_name,token): | |
api = HfApi(token=token) | |
s_ist = (api.list_spaces(author=r_name)) | |
tt=False | |
for i,space in enumerate(s_ist): | |
#space_ea = space.id.split("/",1)[1] | |
#print(space.id) | |
if api.get_space_runtime(space.id).stage == "RUNTIME_ERROR": | |
api.restart_space( | |
repo_id=f'{space.id}', | |
token=token, | |
) | |
with gr.Blocks(css=css) as build: | |
save_state=gr.State() | |
space_list=gr.State([]) | |
with gr.Row(visible=False) as no: | |
pass_box=gr.Textbox() | |
pass_btn=gr.Button() | |
#with gr.Group(visible=True) as yes: | |
with gr.Group(): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
r_name = gr.Textbox(label="Repo") | |
token = gr.Textbox(label="auth") | |
s_btn = gr.Button("Show Spaces") | |
with gr.Column(scale=1): | |
tog=gr.Checkbox(label="Show Files",value=True) | |
files=gr.File(file_count="directory") | |
with gr.Group(): | |
with gr.Tab("View 1"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Group(): | |
space_radio=gr.Radio(label="Spaces",choices=[]) | |
with gr.Column(scale=3): | |
with gr.Tab("Space Info"): | |
with gr.Row(): | |
with gr.Column(scale=3): | |
space_info_json=gr.JSON() | |
gr.Column(scale=1) | |
with gr.Tab("Files"): | |
with gr.Column(scale=1): | |
file_radio=gr.Radio(label="Files",choices=[]) | |
with gr.Column(scale=3): | |
file_contents=gr.HTML() | |
with gr.Tab("Move"): | |
with gr.Row(): | |
dest=gr.Textbox(label="Destination") | |
move_btn=gr.Button("Move Space") | |
with gr.Row(): | |
with gr.Column(): | |
save_list=gr.Radio(label="Protected",choices=[]) | |
with gr.Column(): | |
save_btn=gr.Button("Protect") | |
rem_btn=gr.Button("Remove") | |
with gr.Tab("Controls"): | |
with gr.Row(): | |
with gr.Column(): | |
check_list=gr.CheckboxGroup(choices=[]) | |
com_off = gr.Button("Turn OFF Commenting") | |
com_on = gr.Button("Turn ON Commenting") | |
with gr.Tab("All"): | |
with gr.Row(): | |
with gr.Column(): | |
load_spaces=gr.Button("Load Spaces") | |
with gr.Column(): | |
refresh_btn=gr.Button("Rebuild") | |
space_status=gr.HTML() | |
load_spaces.click(load_space_status, [r_name,token],space_status) | |
refresh_btn.click(rebuild_space, [r_name,token]).then(load_space_status, [r_name,token],space_status) | |
com_off.click(toggle_comments_off, [r_name,check_list,token]) | |
save_btn.click(add_save,[space_radio,save_state],[save_state,save_list]).then(show_s,[r_name,token,save_state],[space_radio,check_list]) | |
rem_btn.click(rem_save,[space_radio,save_state],[save_state,save_list]).then(show_s,[r_name,token,save_state],[space_radio,check_list]) | |
move_btn.click(move,[r_name,space_radio,dest,token]).then(show_s,[r_name,token,save_state],[space_radio,check_list]) | |
s_btn.click(show_s,[r_name,token,save_list],[space_radio,check_list]) | |
space_radio.change(show_f,[r_name,space_radio,token,tog],[files,file_radio,space_info_json]) | |
file_radio.change(show_f_cont,[r_name,space_radio,file_radio,token],[file_contents]) | |
build.queue(default_concurrency_limit=10).launch(show_api=False) |