import os |
import time |
import datetime |
import threading |
import gradio as gr |
import subprocess |
import logging |
from modules import script_callbacks, shared |
from git import Repo |
import shutil |
REPO_NAME = 'sd-webui-backups' |
HF_TOKEN_KEY = 'hf_token' |
BACKUP_PATHS_KEY = 'backup_paths' |
SD_PATH_KEY = 'sd_path' |
HF_USER_KEY = 'hf_user' |
DEFAULT_BACKUP_PATHS = ['models/Stable-diffusion', 'models/VAE', 'embeddings', 'loras'] |
logging.basicConfig(level=logging.INFO, |
format='%(asctime)s - %(levelname)s - %(message)s', |
handlers=[ |
logging.StreamHandler() |
]) |
logger = logging.getLogger(__name__) |
def update_status(script, status, file=None): |
if file: |
script.status = f"{status}: {file}" |
print(f"{status}: {file}") |
else: |
script.status = status |
print(status) |
def clone_or_create_repo(repo_url, repo_path, script): |
update_status(script, "Checking/Cloning Repo...") |
if os.path.exists(repo_path) and os.path.isdir(repo_path): |
logger.info(f"Repository already exists at {repo_path}, updating...") |
repo = Repo(repo_path) |
if repo.is_dirty(): |
logger.warning("Local repo has uncommitted changes. Commit those before running to make sure nothing breaks.") |
update_status(script, "Local repo has uncommitted changes") |
else: |
logger.info(f"Cloning repository from {repo_url} to {repo_path}") |
update_status(script, "Cloning repository") |
try: |
use_git_credential_store = shared.opts.data.get("git_credential_store", True) |
if use_git_credential_store: |
repo = Repo.clone_from(repo_url, repo_path) |
else: |
if "HF_TOKEN" not in os.environ: |
update_status(script, "HF_TOKEN environment variable not found") |
raise Exception("HF_TOKEN environment variable not found") |
env_token = os.environ["HF_TOKEN"] |
repo = Repo.clone_from(repo_url.replace("https://", f"https://{script.hf_user}:{env_token}@"), repo_path) |
except Exception as e: |
logger.error(f"Error creating or cloning repo: {e}") |
update_status(script, f"Error creating or cloning repo: {e}") |
raise |
update_status(script, "Repo ready") |
return repo |
def git_push_files(repo_path, commit_message, script): |
update_status(script, "Pushing changes...") |
try: |
repo = Repo(repo_path) |
repo.git.add(all=True) |
repo.index.commit(commit_message) |
origin = repo.remote(name='origin') |
use_git_credential_store = shared.opts.data.get("git_credential_store", True) |
if use_git_credential_store: |
origin.push() |
else: |
if "HF_TOKEN" not in os.environ: |
update_status(script, "HF_TOKEN environment variable not found") |
raise Exception("HF_TOKEN environment variable not found") |
env_token = os.environ["HF_TOKEN"] |
origin.push(f"https://{script.hf_user}:{env_token}@huggingface.co/{script.hf_user}/{REPO_NAME}") |
logger.info(f"Changes pushed successfully to remote repository.") |
update_status(script, "Pushing Complete") |
except Exception as e: |
logger.error(f"Git push failed: {e}") |
update_status(script, f"Git push failed: {e}") |
raise |
def backup_files(paths, hf_client, script): |
logger.info("Starting backup...") |
update_status(script, "Starting Backup...") |
repo_id = script.hf_user + "/" + REPO_NAME |
repo_path = os.path.join(script.basedir, 'backup') |
sd_path = script.sd_path |
try: |
repo = clone_or_create_repo(f"https://huggingface.co./{repo_id}", repo_path, script) |
except Exception as e: |
logger.error("Error starting the backup, please see the traceback.") |
return |
for base_path in paths: |
logger.info(f"Backing up: {base_path}") |
for root, _, files in os.walk(os.path.join(sd_path, base_path)): |
for file in files: |
local_file_path = os.path.join(root, file) |
repo_file_path = os.path.relpath(local_file_path, start=sd_path) |
try: |
os.makedirs(os.path.dirname(os.path.join(repo_path, repo_file_path)), exist_ok=True) |
shutil.copy2(local_file_path, os.path.join(repo_path, repo_file_path)) |
logger.info(f"Copied: {repo_file_path}") |
update_status(script, "Copied", repo_file_path) |
except Exception as e: |
logger.error(f"Error copying {repo_file_path}: {e}") |
update_status(script, f"Error copying: {repo_file_path}: {e}") |
return |
try: |
git_push_files(repo_path, f"Backup at {datetime.datetime.now()}", script) |
logger.info("Backup complete") |
update_status(script, "Backup Complete") |
except Exception as e: |
logger.error("Error pushing to the repo: ", e) |
return |
def start_backup_thread(script): |
threading.Thread(target=backup_files, args=(script.backup_paths, None, script), daemon=True).start() |
def on_ui(script): |
with gr.Column(): |
with gr.Row(): |
with gr.Column(scale=3): |
hf_token_box = gr.Textbox(label="Huggingface Token", type='password', value=script.hf_token) |
def on_token_change(token): |
script.hf_token = token |
script.save() |
hf_token_box.change(on_token_change, inputs=[hf_token_box], outputs=None) |
with gr.Column(scale=1): |
status_box = gr.Textbox(label="Status", value=script.status) |
def on_start_button(): |
start_backup_thread(script) |
return "Starting Backup" |
start_button = gr.Button(value="Start Backup") |
start_button.click(on_start_button, inputs=None, outputs=[status_box]) |
with gr.Row(): |
with gr.Column(): |
sd_path_box = gr.Textbox(label="SD Webui Path", value=script.sd_path) |
def on_sd_path_change(path): |
script.sd_path = path |
script.save() |
sd_path_box.change(on_sd_path_change, inputs=[sd_path_box], outputs=None) |
with gr.Column(): |
hf_user_box = gr.Textbox(label="Huggingface Username", value=script.hf_user) |
def on_hf_user_change(user): |
script.hf_user = user |
script.save() |
hf_user_box.change(on_hf_user_change, inputs=[hf_user_box], outputs=None) |
with gr.Row(): |
backup_paths_box = gr.Textbox(label="Backup Paths (one path per line)", lines=4, value='\n'.join(script.backup_paths)) |
def on_backup_paths_change(paths): |
paths_list = paths.split('\n') |
paths_list = [p.strip() for p in paths_list if p.strip()] |
script.backup_paths = paths_list |
script.save() |
backup_paths_box.change(on_backup_paths_change, inputs=[backup_paths_box], outputs=None) |
def on_run(script, p, *args): |
pass |
def on_script_load(script): |
script.hf_token = script.load().get(HF_TOKEN_KEY, '') |
script.backup_paths = script.load().get(BACKUP_PATHS_KEY, DEFAULT_BACKUP_PATHS) |
script.sd_path = script.load().get(SD_PATH_KEY, '') |
script.hf_user = script.load().get(HF_USER_KEY, '') |
script.status = "Not running" |
script_callbacks.on_ui_tabs(on_ui) |
script_callbacks.on_script_load(on_script_load) |