import argparse import gradio as gr import json from textwrap import dedent from task_plan import task_plan_list import subprocess from datetime import datetime def read_specific_line(file_path, line_number): try: with open(file_path, 'r') as file: for current_line, line in enumerate(file, start=1): if current_line == line_number: return line.strip() return None # 如果文件不足指定的行数,返回None except FileNotFoundError: return None # 如果文件不存在,返回None def get_folder_paths(file_path): source_folder = read_specific_line(file_path, 2) target_folder = read_specific_line(file_path, 3) if source_folder is None or target_folder is None: return "错误信息:文件不存在。", None, None elif source_folder == "" or target_folder == "": return "错误信息:文件中的第2行或第3行为空。", None, None else: return None, source_folder, target_folder def read_specific_line(file_path, line_number): with open(file_path, "r") as file: for i, line in enumerate(file, 1): if i == line_number: return line.strip() def repo_pull(url): # 从远程仓库拉取代码的操作 subprocess.check_output(["python3", "repo_pull.py", "--repo_url", str(url).replace(".git", "").replace("git", "").replace("clone", "").replace(" ", "")]) yield # 将代码推送到远程仓库的操作 def repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan): task_plan_index = list(task_plan_list.keys()).index(select_task_plan) if task_plan_index >= 0 and task_plan_index <= 5: if task_plan_index == 0: # 克隆目标仓库到本地 subprocess.check_output(["python3", "repo_pull.py", "--repo_url", str(hf_repo_id).replace(".git", "").replace("git", "").replace("clone", "").replace(" ", "")]) # # 打开文件并读取第2行数据 file_path = ".folderinfo" # # 读取指定行数的数据 # source_folder = read_specific_line(file_path, 1) # target_folder = read_specific_line(file_path, 2) error, source_folder, target_folder = get_folder_paths(file_path) if error: print(error) # 这将打印出错误信息 else: print(f"源文件夹: {source_folder}") print(f"目标文件夹: {target_folder}") # 获取当前日期和时间 current_datetime = datetime.now() # 格式化日期和时间 formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S") commit_message = f"魏然 首次上传于 {formatted_datetime}" # subprocess.check_output( # f"cd {target_folder} && cp -rf ../{str(source_folder).replace('./', '')}/* . && git add -A . && git commit -m \"{commit_message}\"", # shell=True) elif task_plan_index == 1: print("选中了 1") elif task_plan_index == 2: print("选中了 2") elif task_plan_index == 3: print("选中了 3") elif task_plan_index == 4: print("选中了 4") else: print("选中了 5") else: print("task_plan_index 不在范围内") yield # 更新进度 def load_set(wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan, progress=gr.Progress()): tasks = [repo_pull(wisemodel_link), repo_push(hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan)] # 将 repo_pull 和 repo_push 添加到任务列表中 total_tasks = len(tasks) for i, task in enumerate(tasks, start=1): progress(i / total_tasks, desc=f"{i} / {total_tasks} ") for _ in task: pass return "下载完成!" # 创建解析命令行参数的函数 def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--language", type=str, default="zh-CN", help="Language option, e.g., 'en-US' or 'zh-CN' or ja-JP or ko-KR or th-TH support now!") return parser.parse_args() # 加载语言对应的JSON文件 def load_language_json(language): filename = f"locales/{language}.json" try: with open(filename, "r", encoding="utf-8") as file: return json.load(file) except FileNotFoundError: print(f"Error: File '{filename}' not found.") return {} # 加载JSON以支持国际化 args = parse_args() language_json = load_language_json(args.language) # # 获取对应语言的文本信息 info = language_json.get("info", {}) _title = info.get("page_title", {}) _about = info.get("about", {}) _wisemodel_lable_title = info.get("wisemodel_lable_title", {}) _hf_repo_id_title = info.get("hf_repo_id_title", {}) _button = info.get("button", {}) _clear = info.get("clear", {}) _setting = info.get("setting", {}) _HuggingFace_Token_title = info.get("HuggingFace_Token_title", {}) _HuggingFace_Token_placeholder = info.get("HuggingFace_Token_placeholder", {}) _WiseModel_Token_title = info.get("WiseModel_Token_title", {}) _WiseModel_Token_placeholder = info.get("WiseModel_Token_placeholder", {}) _Logs = info.get("Logs", {}) _task_plan = info.get("task_plan", {}) _source_user_id = info.get("source_user_id", {}) _target_user_id = info.get("target_user_id", {}) _source_user_placeholder = info.get("source_user_placeholder", {}) _target_user_placeholder = info.get("target_user_placeholder", {}) # with gr.Blocks(title=_title, theme=gr.themes.Soft(text_size="sm")) as demo: with gr.Accordion("📒 " + _about, open=False): _ = f""" 这个空间适用于将 WiseModel 仓库同步转移到 Hugging Face 仓库。\n **请首先确保你是仓库的所有者或者拥有者同意你这样做!**\n # 如何使用这个空间?\n - 复制这个空间并提供 WiseModel token(可选)和你的 读/写 HF token(必填)\n - 在Hugging Face上创建你的目标模型仓库。这一步需要手动完成,空间不会为你创建一个空的仓库。\n - 在你自己的私有空间中填写以下信息。\n - 点击提交按钮,然后观察日志中的输出进展。\n - 重新创建修改自己的 README.md 文件(因为元数据与 HF 不兼容) """ gr.Markdown(dedent(_)) wisemodel_link = gr.Textbox(label=_wisemodel_lable_title) hf_repo_id = gr.Textbox( label=_hf_repo_id_title) with gr.Accordion("🔧 " + _setting, open=False): with gr.Column(scale=12): select_task_plan = gr.Dropdown(choices=list(task_plan_list.keys()), allow_custom_value=True, value="", label=_task_plan) with gr.Row(): tmp1 = gr.Textbox( label=_HuggingFace_Token_title, placeholder=_HuggingFace_Token_placeholder, type="password", # 设置为密码类型,不显示明文 ) tmp2 = gr.Textbox( label=_WiseModel_Token_title, placeholder=_WiseModel_Token_placeholder, type="password", # 设置为密码类型,不显示明文 ) with gr.Row(): tmp3 = gr.Textbox( label=_source_user_id, placeholder=_source_user_placeholder, ) tmp4 = gr.Textbox( label=_target_user_id, placeholder=_target_user_placeholder, ) with gr.Row(): with gr.Column(scale=100): button = gr.Button(_button, variant="primary") with gr.Column(scale=1): clear = gr.Button(_clear) output = gr.Textbox(label=_Logs) button.click(load_set, [wisemodel_link, hf_repo_id, tmp1, tmp2, tmp3, tmp4, select_task_plan], [output]) if __name__ == "__main__": demo.launch(show_error=True, share=True, server_name="0.0.0.0", server_port=7860, debug=True)