import argparse import json import os import shutil from tempfile import TemporaryDirectory from typing import List, Optional from huggingface_hub import CommitInfo, CommitOperationAdd, Discussion, HfApi, hf_hub_download from huggingface_hub.file_download import repo_folder_name class AlreadyExists(Exception): pass def is_index_stable_diffusion_like(config_dict): if "_class_name" not in config_dict: return False compatible_classes = [ "AltDiffusionImg2ImgPipeline", "AltDiffusionPipeline", "CycleDiffusionPipeline", "StableDiffusionImageVariationPipeline", "StableDiffusionImg2ImgPipeline", "StableDiffusionInpaintPipeline", "StableDiffusionInpaintPipelineLegacy", "StableDiffusionPipeline", "StableDiffusionPipelineSafe", "StableDiffusionUpscalePipeline", "VersatileDiffusionDualGuidedPipeline", "VersatileDiffusionImageVariationPipeline", "VersatileDiffusionPipeline", "VersatileDiffusionTextToImagePipeline", "OnnxStableDiffusionImg2ImgPipeline", "OnnxStableDiffusionInpaintPipeline", "OnnxStableDiffusionInpaintPipelineLegacy", "OnnxStableDiffusionPipeline", "StableDiffusionOnnxPipeline", "FlaxStableDiffusionPipeline", ] return config_dict["_class_name"] in compatible_classes def convert_single(model_id: str, folder: str) -> List["CommitOperationAdd"]: config_file = "scheduler/scheduler_config.json" os.makedirs(os.path.join(folder, "scheduler"), exist_ok=True) model_index_file = hf_hub_download(repo_id=model_id, filename="model_index.json") with open(model_index_file, "r") as f: index_dict = json.load(f) if not is_index_stable_diffusion_like(index_dict): print(f"{model_id} is not of type stable diffusion.") return False, False old_config_file = hf_hub_download(repo_id=model_id, filename=config_file) new_config_file = os.path.join(folder, config_file) success = convert_file(old_config_file, new_config_file) if success: operations = [CommitOperationAdd(path_in_repo=config_file, path_or_fileobj=new_config_file)] model_type = success return operations, model_type else: return False, False def convert_file( old_config: str, new_config: str, ): with open(old_config, "r") as f: old_dict = json.load(f) if "clip_sample" not in old_dict: print("Make scheduler DDIM compatible") old_dict["clip_sample"] = False else: print("No matching config") return False # is_stable_diffusion = "down_block_types" in old_dict and list(old_dict["down_block_types"]) == ["CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "DownBlock2D"] # # is_stable_diffusion_1 = is_stable_diffusion and ("use_linear_projection" not in old_dict or old_dict["use_linear_projection"] is False) # is_stable_diffusion_2 = is_stable_diffusion and ("use_linear_projection" in old_dict and old_dict["use_linear_projection"] is True) # # if not is_stable_diffusion_1 and not is_stable_diffusion_2: # print("No matching config") # return False # # if is_stable_diffusion_1: # if old_dict["sample_size"] == 64: # print("Dict correct") # return False # # print("Correct stable diffusion 1") # old_dict["sample_size"] = 64 # # if is_stable_diffusion_2: # if old_dict["sample_size"] == 96: # print("Dict correct") # return False # # print("Correct stable diffusion 2") # old_dict["sample_size"] = 96 # with open(new_config, 'w') as f: json_str = json.dumps(old_dict, indent=2, sort_keys=True) + "\n" f.write(json_str) # # return "Stable Diffusion 1" if is_stable_diffusion_1 else "Stable Diffusion 2" return "Stable Diffusion" def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: try: discussions = api.get_repo_discussions(repo_id=model_id) except Exception: return None for discussion in discussions: if discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title: return discussion def convert(api: "HfApi", model_id: str, force: bool = False) -> Optional["CommitInfo"]: # pr_title = "Correct `sample_size` of {}'s unet to have correct width and height default" pr_title = "Add `clip_sample=False` to scheduler to make model compatible with DDIM." info = api.model_info(model_id) filenames = set(s.rfilename for s in info.siblings) if "unet/config.json" not in filenames: print(f"Model: {model_id} has no 'unet/config.json' file to change") return if "vae/config.json" not in filenames: print(f"Model: {model_id} has no 'vae/config.json' file to change") return with TemporaryDirectory() as d: folder = os.path.join(d, repo_folder_name(repo_id=model_id, repo_type="models")) os.makedirs(folder) new_pr = None try: operations = None pr = previous_pr(api, model_id, pr_title) if pr is not None and not force: url = f"https://huggingface.co./{model_id}/discussions/{pr.num}" new_pr = pr raise AlreadyExists(f"Model {model_id} already has an open PR check out {url}") else: operations, model_type = convert_single(model_id, folder) if operations: pr_title = pr_title.format(model_type) # if model_type == "Stable Diffusion 1": # sample_size = 64 # image_size = 512 # elif model_type == "Stable Diffusion 2": # sample_size = 96 # image_size = 768 # pr_description = ( # f"Since `diffusers==0.9.0` the width and height is automatically inferred from the `sample_size` attribute of your unet's config. It seems like your diffusion model has the same architecture as {model_type} which means that when using this model, by default an image size of {image_size}x{image_size} should be generated. This in turn means the unet's sample size should be **{sample_size}**. \n\n In order to suppress to update your configuration on the fly and to suppress the deprecation warning added in this PR: https://github.com/huggingface/diffusers/pull/1406/files#r1035703505 it is strongly recommended to merge this PR." # ) contributor = model_id.split("/")[0] pr_description = ( f"Hey {contributor} 👋, \n\n Your model repository seems to contain a stable diffusion checkpoint. We have noticed that your scheduler config currently does not correctly work with the [DDIMScheduler](https://huggingface.co./docs/diffusers/main/en/api/schedulers#diffusers.DDIMScheduler) because `clip_sample` is not set to False and will therefore [incorrectly default to True](https://github.com/huggingface/diffusers/blob/3ce6380d3a2ec5c3e3f4f48889d380d657b151bc/src/diffusers/schedulers/scheduling_ddim.py#L127). \n The official stable diffusion checkpoints have `clip_sample=False` so that the scheduler config works will **all** schedulers, see: https://huggingface.co./stabilityai/stable-diffusion-2-1-base/blob/main/scheduler/scheduler_config.json#L7. \n\n We strongly recommend that you merge this PR to make sure your model works correctly with DDIM. \n\n Diffusingly, \n Patrick." ) new_pr = api.create_commit( repo_id=model_id, operations=operations, commit_message=pr_title, commit_description=pr_description, create_pr=True, ) print(f"Pr created at {new_pr.pr_url}") else: print(f"No files to convert for {model_id}") finally: shutil.rmtree(folder) return new_pr if __name__ == "__main__": DESCRIPTION = """ Simple utility tool to convert automatically some weights on the hub to `safetensors` format. It is PyTorch exclusive for now. It works by downloading the weights (PT), converting them locally, and uploading them back as a PR on the hub. """ parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument( "model_id", type=str, help="The name of the model on the hub to convert. E.g. `gpt2` or `facebook/wav2vec2-base-960h`", ) parser.add_argument( "--force", action="store_true", help="Create the PR even if it already exists of if the model was already converted.", ) args = parser.parse_args() model_id = args.model_id api = HfApi() convert(api, model_id, force=args.force)