Spaces:
Runtime error
Runtime error
import argparse | |
import gradio as gr | |
import json | |
from textwrap import dedent | |
from task_plan import task_plan_list | |
import subprocess | |
from datetime import datetime | |
def read_specific_line(file_path, line_number): | |
try: | |
with open(file_path, 'r') as file: | |
for current_line, line in enumerate(file, start=1): | |
if current_line == line_number: | |
return line.strip() | |
return None # 如果文件不足指定的行数,返回None | |
except FileNotFoundError: | |
return None # 如果文件不存在,返回None | |
def get_folder_paths(file_path): | |
source_folder = read_specific_line(file_path, 2) | |
target_folder = read_specific_line(file_path, 3) | |
if source_folder is None or target_folder is None: | |
return "错误信息:文件不存在。", None, None | |
elif source_folder == "" or target_folder == "": | |
return "错误信息:文件中的第2行或第3行为空。", None, None | |
else: | |
return None, source_folder, target_folder | |
def read_specific_line(file_path, line_number): | |
with open(file_path, "r") as file: | |
for i, line in enumerate(file, 1): | |
if i == line_number: | |
return line.strip() | |
def repo_pull(url): | |
# 从远程仓库拉取代码的操作 | |
subprocess.check_output(["python3", "repo_pull.py", "--repo_url", | |
str(url).replace(".git", "").replace("git", "").replace("clone", "").replace(" ", | |
"")]) | |
yield | |
# 将代码推送到远程仓库的操作 | |
def repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan): | |
task_plan_index = list(task_plan_list.keys()).index(select_task_plan) | |
if task_plan_index >= 0 and task_plan_index <= 5: | |
if task_plan_index == 0: | |
# 克隆目标仓库到本地 | |
subprocess.check_output(["python3", "repo_pull.py", "--repo_url", | |
str(hf_repo_id).replace(".git", "").replace("git", "").replace("clone", | |
"").replace(" ", | |
"")]) | |
# # 打开文件并读取第2行数据 | |
file_path = ".folderinfo" | |
# # 读取指定行数的数据 | |
# source_folder = read_specific_line(file_path, 1) | |
# target_folder = read_specific_line(file_path, 2) | |
error, source_folder, target_folder = get_folder_paths(file_path) | |
if error: | |
print(error) # 这将打印出错误信息 | |
else: | |
print(f"源文件夹: {source_folder}") | |
print(f"目标文件夹: {target_folder}") | |
# 获取当前日期和时间 | |
current_datetime = datetime.now() | |
# 格式化日期和时间 | |
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S") | |
commit_message = f"魏然 首次上传于 {formatted_datetime}" | |
# subprocess.check_output( | |
# f"cd {target_folder} && cp -rf ../{str(source_folder).replace('./', '')}/* . && git add -A . && git commit -m \"{commit_message}\"", | |
# shell=True) | |
elif task_plan_index == 1: | |
print("选中了 1") | |
elif task_plan_index == 2: | |
print("选中了 2") | |
elif task_plan_index == 3: | |
print("选中了 3") | |
elif task_plan_index == 4: | |
print("选中了 4") | |
else: | |
print("选中了 5") | |
else: | |
print("task_plan_index 不在范围内") | |
yield | |
# 更新进度 | |
def load_set(wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan, progress=gr.Progress()): | |
tasks = [repo_pull(wisemodel_link), | |
repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan)] # 将 repo_pull 和 repo_push 添加到任务列表中 | |
total_tasks = len(tasks) | |
for i, task in enumerate(tasks, start=1): | |
progress(i / total_tasks, desc=f"{i} / {total_tasks} ") | |
for _ in task: | |
pass | |
return "下载完成!" | |
# 创建解析命令行参数的函数 | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--language", type=str, default="zh-CN", | |
help="Language option, e.g., 'en-US' or 'zh-CN' or ja-JP or ko-KR or th-TH support now!") | |
return parser.parse_args() | |
# 加载语言对应的JSON文件 | |
def load_language_json(language): | |
filename = f"locales/{language}.json" | |
try: | |
with open(filename, "r", encoding="utf-8") as file: | |
return json.load(file) | |
except FileNotFoundError: | |
print(f"Error: File '{filename}' not found.") | |
return {} | |
# 加载JSON以支持国际化 | |
args = parse_args() | |
language_json = load_language_json(args.language) | |
# # 获取对应语言的文本信息 | |
info = language_json.get("info", {}) | |
_title = info.get("page_title", {}) | |
_about = info.get("about", {}) | |
_wisemodel_lable_title = info.get("wisemodel_lable_title", {}) | |
_hf_repo_id_title = info.get("hf_repo_id_title", {}) | |
_button = info.get("button", {}) | |
_clear = info.get("clear", {}) | |
_setting = info.get("setting", {}) | |
_HuggingFace_Token_title = info.get("HuggingFace_Token_title", {}) | |
_HuggingFace_Token_placeholder = info.get("HuggingFace_Token_placeholder", {}) | |
_WiseModel_Token_title = info.get("WiseModel_Token_title", {}) | |
_WiseModel_Token_placeholder = info.get("WiseModel_Token_placeholder", {}) | |
_Logs = info.get("Logs", {}) | |
_task_plan = info.get("task_plan", {}) | |
_source_user_id = info.get("source_user_id", {}) | |
_target_user_id = info.get("target_user_id", {}) | |
_source_user_placeholder = info.get("source_user_placeholder", {}) | |
_target_user_placeholder = info.get("target_user_placeholder", {}) | |
# | |
with gr.Blocks(title=_title, theme=gr.themes.Soft(text_size="sm")) as demo: | |
with gr.Accordion("📒 " + _about, open=False): | |
_ = f""" 这个空间适用于将 WiseModel 仓库同步转移到 Hugging Face 仓库。\n | |
**请首先确保你是仓库的所有者或者拥有者同意你这样做!**\n | |
# 如何使用这个空间?\n | |
- 复制这个空间并提供 WiseModel token(可选)和你的 读/写 HF token(必填)\n | |
- 在Hugging Face上创建你的目标模型仓库。这一步需要手动完成,空间不会为你创建一个空的仓库。\n | |
- 在你自己的私有空间中填写以下信息。\n | |
- 点击提交按钮,然后观察日志中的输出进展。\n | |
- 重新创建修改自己的 README.md 文件(因为元数据与 HF 不兼容) | |
""" | |
gr.Markdown(dedent(_)) | |
wisemodel_link = gr.Textbox(label=_wisemodel_lable_title) | |
hf_repo_id = gr.Textbox( | |
label=_hf_repo_id_title) | |
with gr.Accordion("🔧 " + _setting, open=False): | |
with gr.Column(scale=12): | |
select_task_plan = gr.Dropdown(choices=list(task_plan_list.keys()), allow_custom_value=True, value="", | |
label=_task_plan) | |
with gr.Row(): | |
tmp1 = gr.Textbox( | |
label=_HuggingFace_Token_title, | |
placeholder=_HuggingFace_Token_placeholder, | |
type="password", # 设置为密码类型,不显示明文 | |
) | |
tmp2 = gr.Textbox( | |
label=_WiseModel_Token_title, | |
placeholder=_WiseModel_Token_placeholder, | |
type="password", # 设置为密码类型,不显示明文 | |
) | |
with gr.Row(): | |
tmp3 = gr.Textbox( | |
label=_source_user_id, | |
placeholder=_source_user_placeholder, | |
) | |
tmp4 = gr.Textbox( | |
label=_target_user_id, | |
placeholder=_target_user_placeholder, | |
) | |
with gr.Row(): | |
with gr.Column(scale=100): | |
button = gr.Button(_button, variant="primary") | |
with gr.Column(scale=1): | |
clear = gr.Button(_clear) | |
output = gr.Textbox(label=_Logs) | |
button.click(load_set, [wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan], [output]) | |
if __name__ == "__main__": | |
demo.launch(show_error=True, share=True, server_name="0.0.0.0", server_port=7860, debug=True) | |