GitSyncMaster / app.py
魏然
魏然 首次上传于 2024/04/13
e80fd1f
raw
history blame
8.6 kB
import argparse
import gradio as gr
import json
from textwrap import dedent
from task_plan import task_plan_list
import subprocess
from datetime import datetime
def read_specific_line(file_path, line_number):
try:
with open(file_path, 'r') as file:
for current_line, line in enumerate(file, start=1):
if current_line == line_number:
return line.strip()
return None # 如果文件不足指定的行数,返回None
except FileNotFoundError:
return None # 如果文件不存在,返回None
def get_folder_paths(file_path):
source_folder = read_specific_line(file_path, 2)
target_folder = read_specific_line(file_path, 3)
if source_folder is None or target_folder is None:
return "错误信息:文件不存在。", None, None
elif source_folder == "" or target_folder == "":
return "错误信息:文件中的第2行或第3行为空。", None, None
else:
return None, source_folder, target_folder
def read_specific_line(file_path, line_number):
with open(file_path, "r") as file:
for i, line in enumerate(file, 1):
if i == line_number:
return line.strip()
def repo_pull(url):
# 从远程仓库拉取代码的操作
subprocess.check_output(["python3", "repo_pull.py", "--repo_url",
str(url).replace(".git", "").replace("git", "").replace("clone", "").replace(" ",
"")])
yield
# 将代码推送到远程仓库的操作
def repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan):
task_plan_index = list(task_plan_list.keys()).index(select_task_plan)
if task_plan_index >= 0 and task_plan_index <= 5:
if task_plan_index == 0:
# 克隆目标仓库到本地
subprocess.check_output(["python3", "repo_pull.py", "--repo_url",
str(hf_repo_id).replace(".git", "").replace("git", "").replace("clone",
"").replace(" ",
"")])
# # 打开文件并读取第2行数据
file_path = ".folderinfo"
# # 读取指定行数的数据
# source_folder = read_specific_line(file_path, 1)
# target_folder = read_specific_line(file_path, 2)
error, source_folder, target_folder = get_folder_paths(file_path)
if error:
print(error) # 这将打印出错误信息
else:
print(f"源文件夹: {source_folder}")
print(f"目标文件夹: {target_folder}")
# 获取当前日期和时间
current_datetime = datetime.now()
# 格式化日期和时间
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
commit_message = f"魏然 首次上传于 {formatted_datetime}"
# subprocess.check_output(
# f"cd {target_folder} && cp -rf ../{str(source_folder).replace('./', '')}/* . && git add -A . && git commit -m \"{commit_message}\"",
# shell=True)
elif task_plan_index == 1:
print("选中了 1")
elif task_plan_index == 2:
print("选中了 2")
elif task_plan_index == 3:
print("选中了 3")
elif task_plan_index == 4:
print("选中了 4")
else:
print("选中了 5")
else:
print("task_plan_index 不在范围内")
yield
# 更新进度
def load_set(wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan, progress=gr.Progress()):
tasks = [repo_pull(wisemodel_link),
repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan)] # 将 repo_pull 和 repo_push 添加到任务列表中
total_tasks = len(tasks)
for i, task in enumerate(tasks, start=1):
progress(i / total_tasks, desc=f"{i} / {total_tasks} ")
for _ in task:
pass
return "下载完成!"
# 创建解析命令行参数的函数
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--language", type=str, default="zh-CN",
help="Language option, e.g., 'en-US' or 'zh-CN' or ja-JP or ko-KR or th-TH support now!")
return parser.parse_args()
# 加载语言对应的JSON文件
def load_language_json(language):
filename = f"locales/{language}.json"
try:
with open(filename, "r", encoding="utf-8") as file:
return json.load(file)
except FileNotFoundError:
print(f"Error: File '{filename}' not found.")
return {}
# 加载JSON以支持国际化
args = parse_args()
language_json = load_language_json(args.language)
# # 获取对应语言的文本信息
info = language_json.get("info", {})
_title = info.get("page_title", {})
_about = info.get("about", {})
_wisemodel_lable_title = info.get("wisemodel_lable_title", {})
_hf_repo_id_title = info.get("hf_repo_id_title", {})
_button = info.get("button", {})
_clear = info.get("clear", {})
_setting = info.get("setting", {})
_HuggingFace_Token_title = info.get("HuggingFace_Token_title", {})
_HuggingFace_Token_placeholder = info.get("HuggingFace_Token_placeholder", {})
_WiseModel_Token_title = info.get("WiseModel_Token_title", {})
_WiseModel_Token_placeholder = info.get("WiseModel_Token_placeholder", {})
_Logs = info.get("Logs", {})
_task_plan = info.get("task_plan", {})
_source_user_id = info.get("source_user_id", {})
_target_user_id = info.get("target_user_id", {})
_source_user_placeholder = info.get("source_user_placeholder", {})
_target_user_placeholder = info.get("target_user_placeholder", {})
#
with gr.Blocks(title=_title, theme=gr.themes.Soft(text_size="sm")) as demo:
with gr.Accordion("📒 " + _about, open=False):
_ = f""" 这个空间适用于将 WiseModel 仓库同步转移到 Hugging Face 仓库。\n
**请首先确保你是仓库的所有者或者拥有者同意你这样做!**\n
# 如何使用这个空间?\n
- 复制这个空间并提供 WiseModel token(可选)和你的 读/写 HF token(必填)\n
- 在Hugging Face上创建你的目标模型仓库。这一步需要手动完成,空间不会为你创建一个空的仓库。\n
- 在你自己的私有空间中填写以下信息。\n
- 点击提交按钮,然后观察日志中的输出进展。\n
- 重新创建修改自己的 README.md 文件(因为元数据与 HF 不兼容)
"""
gr.Markdown(dedent(_))
wisemodel_link = gr.Textbox(label=_wisemodel_lable_title)
hf_repo_id = gr.Textbox(
label=_hf_repo_id_title)
with gr.Accordion("🔧 " + _setting, open=False):
with gr.Column(scale=12):
select_task_plan = gr.Dropdown(choices=list(task_plan_list.keys()), allow_custom_value=True, value="",
label=_task_plan)
with gr.Row():
tmp1 = gr.Textbox(
label=_HuggingFace_Token_title,
placeholder=_HuggingFace_Token_placeholder,
type="password", # 设置为密码类型,不显示明文
)
tmp2 = gr.Textbox(
label=_WiseModel_Token_title,
placeholder=_WiseModel_Token_placeholder,
type="password", # 设置为密码类型,不显示明文
)
with gr.Row():
tmp3 = gr.Textbox(
label=_source_user_id,
placeholder=_source_user_placeholder,
)
tmp4 = gr.Textbox(
label=_target_user_id,
placeholder=_target_user_placeholder,
)
with gr.Row():
with gr.Column(scale=100):
button = gr.Button(_button, variant="primary")
with gr.Column(scale=1):
clear = gr.Button(_clear)
output = gr.Textbox(label=_Logs)
button.click(load_set, [wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan], [output])
if __name__ == "__main__":
demo.launch(show_error=True, share=True, server_name="0.0.0.0", server_port=7860, debug=True)