|
import os |
|
import time |
|
import datetime |
|
import threading |
|
import gradio as gr |
|
import subprocess |
|
import logging |
|
from modules import script_callbacks, shared |
|
from git import Repo |
|
import shutil |
|
|
|
|
|
REPO_NAME = 'sd-webui-backups' |
|
BACKUP_INTERVAL = 3600 |
|
HF_TOKEN_KEY = 'hf_token' |
|
BACKUP_PATHS_KEY = 'backup_paths' |
|
SD_PATH_KEY = 'sd_path' |
|
HF_USER_KEY = 'hf_user' |
|
DEFAULT_BACKUP_PATHS = ['models/Stable-diffusion', 'models/VAE', 'embeddings', 'loras'] |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler() |
|
]) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def update_status(script, status, file=None): |
|
if file: |
|
script.status = f"{status}: {file}" |
|
print(f"{status}: {file}") |
|
else: |
|
script.status = status |
|
print(status) |
|
|
|
|
|
def clone_or_create_repo(repo_url, repo_path, script): |
|
update_status(script, "Checking/Cloning Repo...") |
|
if os.path.exists(repo_path) and os.path.isdir(repo_path): |
|
logger.info(f"Repository already exists at {repo_path}, updating...") |
|
repo = Repo(repo_path) |
|
if repo.is_dirty(): |
|
logger.warning("Local repo has uncommitted changes. Commit those before running to make sure nothing breaks.") |
|
update_status(script, "Local repo has uncommitted changes") |
|
else: |
|
logger.info(f"Cloning repository from {repo_url} to {repo_path}") |
|
update_status(script, "Cloning repository") |
|
try: |
|
use_git_credential_store = shared.opts.data.get("git_credential_store", True) |
|
if use_git_credential_store: |
|
repo = Repo.clone_from(repo_url, repo_path) |
|
else: |
|
if "HF_TOKEN" not in os.environ: |
|
update_status(script, "HF_TOKEN environment variable not found") |
|
raise Exception("HF_TOKEN environment variable not found") |
|
env_token = os.environ["HF_TOKEN"] |
|
repo = Repo.clone_from(repo_url.replace("https://", f"https://{script.hf_user}:{env_token}@"), repo_path) |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating or cloning repo: {e}") |
|
update_status(script, f"Error creating or cloning repo: {e}") |
|
raise |
|
update_status(script, "Repo ready") |
|
return repo |
|
|
|
def git_push_files(repo_path, commit_message, script): |
|
update_status(script, "Pushing changes...") |
|
try: |
|
repo = Repo(repo_path) |
|
repo.git.add(all=True) |
|
repo.index.commit(commit_message) |
|
origin = repo.remote(name='origin') |
|
use_git_credential_store = shared.opts.data.get("git_credential_store", True) |
|
if use_git_credential_store: |
|
origin.push() |
|
else: |
|
if "HF_TOKEN" not in os.environ: |
|
update_status(script, "HF_TOKEN environment variable not found") |
|
raise Exception("HF_TOKEN environment variable not found") |
|
env_token = os.environ["HF_TOKEN"] |
|
origin.push(f"https://{script.hf_user}:{env_token}@huggingface.co/{script.hf_user}/{REPO_NAME}") |
|
|
|
logger.info(f"Changes pushed successfully to remote repository.") |
|
update_status(script, "Pushing Complete") |
|
except Exception as e: |
|
logger.error(f"Git push failed: {e}") |
|
update_status(script, f"Git push failed: {e}") |
|
raise |
|
|
|
|
|
def backup_files(paths, hf_client, script): |
|
logger.info("Starting backup...") |
|
update_status(script, "Starting Backup...") |
|
repo_id = script.hf_user + "/" + REPO_NAME |
|
repo_path = os.path.join(script.basedir, 'backup') |
|
sd_path = script.sd_path |
|
|
|
try: |
|
repo = clone_or_create_repo(f"https://huggingface.co./{repo_id}", repo_path, script) |
|
except Exception as e: |
|
logger.error("Error starting the backup, please see the traceback.") |
|
return |
|
|
|
for base_path in paths: |
|
logger.info(f"Backing up: {base_path}") |
|
for root, _, files in os.walk(os.path.join(sd_path, base_path)): |
|
for file in files: |
|
local_file_path = os.path.join(root, file) |
|
repo_file_path = os.path.relpath(local_file_path, start=sd_path) |
|
try: |
|
os.makedirs(os.path.dirname(os.path.join(repo_path, repo_file_path)), exist_ok=True) |
|
shutil.copy2(local_file_path, os.path.join(repo_path, repo_file_path)) |
|
logger.info(f"Copied: {repo_file_path}") |
|
update_status(script, "Copied", repo_file_path) |
|
except Exception as e: |
|
logger.error(f"Error copying {repo_file_path}: {e}") |
|
update_status(script, f"Error copying: {repo_file_path}: {e}") |
|
return |
|
|
|
try: |
|
git_push_files(repo_path, f"Backup at {datetime.datetime.now()}", script) |
|
logger.info("Backup complete") |
|
update_status(script, "Backup Complete") |
|
except Exception as e: |
|
logger.error("Error pushing to the repo: ", e) |
|
return |
|
|
|
def start_backup_thread(script): |
|
threading.Thread(target=backup_files, args=(script.backup_paths, None, script), daemon=True).start() |
|
|
|
|
|
def on_ui(script): |
|
with gr.Column(): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
hf_token_box = gr.Textbox(label="Huggingface Token", type='password', value=script.hf_token) |
|
def on_token_change(token): |
|
script.hf_token = token |
|
script.save() |
|
hf_token_box.change(on_token_change, inputs=[hf_token_box], outputs=None) |
|
with gr.Column(scale=1): |
|
status_box = gr.Textbox(label="Status", value=script.status) |
|
|
|
def on_start_button(): |
|
start_backup_thread(script) |
|
return "Starting Backup" |
|
|
|
start_button = gr.Button(value="Start Backup") |
|
start_button.click(on_start_button, inputs=None, outputs=[status_box]) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
sd_path_box = gr.Textbox(label="SD Webui Path", value=script.sd_path) |
|
def on_sd_path_change(path): |
|
script.sd_path = path |
|
script.save() |
|
sd_path_box.change(on_sd_path_change, inputs=[sd_path_box], outputs=None) |
|
with gr.Column(): |
|
hf_user_box = gr.Textbox(label="Huggingface Username", value=script.hf_user) |
|
def on_hf_user_change(user): |
|
script.hf_user = user |
|
script.save() |
|
hf_user_box.change(on_hf_user_change, inputs=[hf_user_box], outputs=None) |
|
with gr.Row(): |
|
backup_paths_box = gr.Textbox(label="Backup Paths (one path per line)", lines=4, value='\n'.join(script.backup_paths)) |
|
def on_backup_paths_change(paths): |
|
paths_list = paths.split('\n') |
|
paths_list = [p.strip() for p in paths_list if p.strip()] |
|
script.backup_paths = paths_list |
|
script.save() |
|
backup_paths_box.change(on_backup_paths_change, inputs=[backup_paths_box], outputs=None) |
|
|
|
def on_run(script, p, *args): |
|
pass |
|
|
|
def on_script_load(script): |
|
script.hf_token = script.load().get(HF_TOKEN_KEY, '') |
|
script.backup_paths = script.load().get(BACKUP_PATHS_KEY, DEFAULT_BACKUP_PATHS) |
|
script.sd_path = script.load().get(SD_PATH_KEY, '') |
|
script.hf_user = script.load().get(HF_USER_KEY, '') |
|
script.status = "Not running" |
|
|
|
|
|
script_callbacks.on_ui_tabs(on_ui) |
|
script_callbacks.on_script_load(on_script_load) |
|
|