|
from app_settings import AppSettings |
|
from utils import show_system_info |
|
import constants |
|
from argparse import ArgumentParser |
|
from context import Context |
|
from constants import APP_VERSION, LCM_DEFAULT_MODEL_OPENVINO |
|
from models.interface_types import InterfaceType |
|
from constants import DEVICE |
|
from state import get_settings |
|
|
|
|
|
from fastapi import FastAPI,Body |
|
|
|
import uvicorn |
|
import json |
|
import logging |
|
from PIL import Image |
|
import time |
|
|
|
from diffusers.utils import load_image |
|
import base64 |
|
import io |
|
from datetime import datetime |
|
|
|
app = FastAPI(name="mutilParam") |
|
|
|
from typing import Any |
|
from backend.models.lcmdiffusion_setting import DiffusionTask |
|
|
|
from frontend.utils import is_reshape_required |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
app_settings = get_settings() |
|
|
|
context = Context(InterfaceType.WEBUI) |
|
previous_width = 0 |
|
previous_height = 0 |
|
previous_model_id = "" |
|
previous_num_of_images = 0 |
|
|
|
parser = ArgumentParser(description=f"FAST SD CPU {constants.APP_VERSION}") |
|
parser.add_argument( |
|
"-s", |
|
"--share", |
|
action="store_true", |
|
help="Create sharable link(Web UI)", |
|
required=False, |
|
) |
|
group = parser.add_mutually_exclusive_group(required=False) |
|
group.add_argument( |
|
"-g", |
|
"--gui", |
|
action="store_true", |
|
help="Start desktop GUI", |
|
) |
|
group.add_argument( |
|
"-w", |
|
"--webui", |
|
action="store_true", |
|
help="Start Web UI", |
|
) |
|
group.add_argument( |
|
"-r", |
|
"--realtime", |
|
action="store_true", |
|
help="Start realtime inference UI(experimental)", |
|
) |
|
group.add_argument( |
|
"-v", |
|
"--version", |
|
action="store_true", |
|
help="Version", |
|
) |
|
parser.add_argument( |
|
"--lcm_model_id", |
|
type=str, |
|
help="Model ID or path,Default SimianLuo/LCM_Dreamshaper_v7", |
|
default="SimianLuo/LCM_Dreamshaper_v7", |
|
) |
|
parser.add_argument( |
|
"--prompt", |
|
type=str, |
|
help="Describe the image you want to generate", |
|
) |
|
parser.add_argument( |
|
"--image_height", |
|
type=int, |
|
help="Height of the image", |
|
default=512, |
|
) |
|
parser.add_argument( |
|
"--image_width", |
|
type=int, |
|
help="Width of the image", |
|
default=512, |
|
) |
|
parser.add_argument( |
|
"--inference_steps", |
|
type=int, |
|
help="Number of steps,default : 4", |
|
default=4, |
|
) |
|
parser.add_argument( |
|
"--guidance_scale", |
|
type=int, |
|
help="Guidance scale,default : 1.0", |
|
default=1.0, |
|
) |
|
|
|
parser.add_argument( |
|
"--number_of_images", |
|
type=int, |
|
help="Number of images to generate ,default : 1", |
|
default=1, |
|
) |
|
parser.add_argument( |
|
"--seed", |
|
type=int, |
|
help="Seed,default : -1 (disabled) ", |
|
default=-1, |
|
) |
|
parser.add_argument( |
|
"--use_openvino", |
|
action="store_true", |
|
help="Use OpenVINO model", |
|
) |
|
|
|
parser.add_argument( |
|
"--use_offline_model", |
|
action="store_true", |
|
help="Use offline model", |
|
) |
|
parser.add_argument( |
|
"--use_safety_checker", |
|
action="store_false", |
|
help="Use safety checker", |
|
) |
|
parser.add_argument( |
|
"--use_lcm_lora", |
|
action="store_true", |
|
help="Use LCM-LoRA", |
|
) |
|
parser.add_argument( |
|
"--base_model_id", |
|
type=str, |
|
help="LCM LoRA base model ID,Default Lykon/dreamshaper-8", |
|
default="Lykon/dreamshaper-8", |
|
) |
|
parser.add_argument( |
|
"--lcm_lora_id", |
|
type=str, |
|
help="LCM LoRA model ID,Default latent-consistency/lcm-lora-sdv1-5", |
|
default="latent-consistency/lcm-lora-sdv1-5", |
|
) |
|
parser.add_argument( |
|
"-i", |
|
"--interactive", |
|
action="store_true", |
|
help="Interactive CLI mode", |
|
) |
|
parser.add_argument( |
|
"--use_tiny_auto_encoder", |
|
action="store_true", |
|
help="Use tiny auto encoder for SD (TAESD)", |
|
) |
|
args = parser.parse_args() |
|
|
|
if args.version: |
|
print(APP_VERSION) |
|
exit() |
|
|
|
|
|
show_system_info() |
|
print(f"Using device : {constants.DEVICE}") |
|
app_settings = get_settings() |
|
|
|
print(f"Found {len(app_settings.lcm_models)} LCM models in config/lcm-models.txt") |
|
print( |
|
f"Found {len(app_settings.stable_diffsuion_models)} stable diffusion models in config/stable-diffusion-models.txt" |
|
) |
|
print( |
|
f"Found {len(app_settings.lcm_lora_models)} LCM-LoRA models in config/lcm-lora-models.txt" |
|
) |
|
print( |
|
f"Found {len(app_settings.openvino_lcm_models)} OpenVINO LCM models in config/openvino-lcm-models.txt" |
|
) |
|
|
|
from frontend.webui.ui import start_webui |
|
|
|
print("Starting web UI mode") |
|
start_webui( |
|
args.share, |
|
) |
|
|
|
app.get("/") |
|
def root(): |
|
return {"API": "hello"} |
|
|
|
@app.post("/img2img") |
|
async def predict(prompt=Body(...),imgbase64data=Body(...),negative_prompt=Body(None),userId=Body(None)): |
|
pipeline = get_pipeline() |
|
MAX_QUEUE_SIZE = 4 |
|
start = time.time() |
|
print("参数",imgbase64data,prompt) |
|
image_data = base64.b64decode(imgbase64data) |
|
image1 = Image.open(io.BytesIO(image_data)) |
|
w, h = image1.size |
|
newW = 512 |
|
newH = int(h * newW / w) |
|
img = image1.resize((newW, newH)) |
|
end1 = time.time() |
|
now = datetime.now() |
|
print(now) |
|
print("图像:", img.size) |
|
print("加载管道:", end1 - start) |
|
global previous_height, previous_width, previous_model_id, previous_num_of_images, app_settings |
|
|
|
app_settings.settings.lcm_diffusion_setting.prompt = prompt |
|
app_settings.settings.lcm_diffusion_setting.negative_prompt = negative_prompt |
|
app_settings.settings.lcm_diffusion_setting.init_image = img |
|
app_settings.settings.lcm_diffusion_setting.strength = 0.6 |
|
|
|
app_settings.settings.lcm_diffusion_setting.diffusion_task = ( |
|
DiffusionTask.image_to_image.value |
|
) |
|
model_id = app_settings.settings.lcm_diffusion_setting.openvino_lcm_model_id |
|
reshape = False |
|
app_settings.settings.lcm_diffusion_setting.image_height=newH |
|
image_width = app_settings.settings.lcm_diffusion_setting.image_width |
|
image_height = app_settings.settings.lcm_diffusion_setting.image_height |
|
num_images = app_settings.settings.lcm_diffusion_setting.number_of_images |
|
reshape = is_reshape_required( |
|
previous_width, |
|
image_width, |
|
previous_height, |
|
image_height, |
|
previous_model_id, |
|
model_id, |
|
previous_num_of_images, |
|
num_images, |
|
) |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit( |
|
context.generate_text_to_image, |
|
app_settings.settings, |
|
reshape, |
|
DEVICE, |
|
) |
|
images = future.result() |
|
|
|
|
|
|
|
|
|
|
|
previous_width = image_width |
|
previous_height = image_height |
|
previous_model_id = model_id |
|
previous_num_of_images = num_images |
|
output_image = images[0] |
|
end2 = time.time() |
|
print("测试",output_image) |
|
print("s生成完成:", end2 - end1) |
|
|
|
image_data = io.BytesIO() |
|
|
|
|
|
output_image.save(image_data, format='JPEG') |
|
|
|
|
|
image_data_bytes = image_data.getvalue() |
|
output_image_base64 = base64.b64encode(image_data_bytes).decode('utf-8') |
|
print("完成的图片:", output_image_base64) |
|
logger = logging.getLogger('') |
|
logger.info(output_image_base64) |
|
return output_image_base64 |
|
|
|
|
|
@app.post("/predict") |
|
async def predict(prompt=Body(...)): |
|
return f"您好,{prompt}" |
|
|