File size: 2,133 Bytes
4baf7bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import torch
import gc

from helper import switch_mps_device
# from model.fcf import FcF
from model.lama import LaMa
# from model.ldm import LDM
# from model.manga import Manga
# from model.mat import MAT
from model.paint_by_example import PaintByExample
# from model.instruct_pix2pix import InstructPix2Pix
from model.sd import SD15, SD2, Anything4, RealisticVision14
# from model.zits import ZITS
# from model.opencv2 import OpenCV2
from schema import Config

models = {
    "lama": LaMa,
    # "ldm": LDM,
    # "zits": ZITS,
    # "mat": MAT,
    # "fcf": FcF,
    "sd1.5": SD15,
    Anything4.name: Anything4,
    RealisticVision14.name: RealisticVision14,
    # "cv2": OpenCV2,
    # "manga": Manga,
    "sd2": SD2,
    "paint_by_example": PaintByExample,
    # "instruct_pix2pix": InstructPix2Pix,
}


class ModelManager:
    def __init__(self, name: str, device: torch.device, **kwargs):
        self.name = name
        self.device = device
        self.kwargs = kwargs
        self.model = self.init_model(name, device, **kwargs)

    def init_model(self, name: str, device, **kwargs):
        if name in models:
            model = models[name](device, **kwargs)
        else:
            raise NotImplementedError(f"Not supported model: {name}")
        return model

    def is_downloaded(self, name: str) -> bool:
        if name in models:
            return models[name].is_downloaded()
        else:
            raise NotImplementedError(f"Not supported model: {name}")

    def __call__(self, image, mask, config: Config):
        return self.model(image, mask, config)

    def switch(self, new_name: str):
        if new_name == self.name:
            return
        try:
            if torch.cuda.memory_allocated() > 0:
                # Clear current loaded model from memory
                torch.cuda.empty_cache()
                del self.model
                gc.collect()

            self.model = self.init_model(
                new_name, switch_mps_device(new_name, self.device), **self.kwargs
            )
            self.name = new_name
        except NotImplementedError as e:
            raise e