import numpy as np import gradio as gr import roop.globals from roop.core import ( start, decode_execution_providers, suggest_max_memory, suggest_execution_threads, ) from roop.processors.frame.core import get_frame_processors_modules from roop.utilities import normalize_output_path import os from PIL import Image from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import utils import base64 import json import datetime import pylru token_queue = pylru.lrucache(1000) def load_public_key_from_file(file_path): with open(file_path, "rb") as key_file: public_key = serialization.load_pem_public_key( key_file.read(), backend=default_backend() ) return public_key def verify_signature(public_key, data, signature): """ Verify a signature with a public key. Converts the data to bytes if it's not already in byte format. """ # Ensure the data is in bytes. If it's a string, encode it to UTF-8. if isinstance(data, str): data = data.encode('utf-8') try: # Verify the signature public_key.verify( signature, data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return True except Exception as e: print("Verification failed:", e) return False public_key = load_public_key_from_file("./nsfwais.pubkey.pem") def swap_face(source_file, target_file, doFaceEnhancer, skey, key): skey = json.loads(skey) #first validate skey signature = base64.b64decode(skey["s"]) if not skey or not key or not verify_signature(public_key, skey["t"] + "_" + key, signature): raise Exception("verify authkey failed.") timestamp_requested = float(skey["t"]) timestamp_now = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp()) if timestamp_now - timestamp_requested > 600: raise Exception(f"authkey timeout, {timestamp_now - timestamp_requested}") print(f"authkey pass, {timestamp_now - timestamp_requested}") token = skey["t"] token_cache = token_queue.get(token, None) if token_cache: raise Exception(f"duplicated token, {token_cache}") token_queue[token] = True source_path = "input.jpg" target_path = "target.jpg" source_image = Image.fromarray(source_file) source_image.save(source_path) target_image = Image.fromarray(target_file) target_image.save(target_path) print("source_path: ", source_path) print("target_path: ", target_path) roop.globals.source_path = source_path roop.globals.target_path = target_path output_path = "output.jpg" roop.globals.output_path = normalize_output_path( roop.globals.source_path, roop.globals.target_path, output_path ) if doFaceEnhancer: roop.globals.frame_processors = ["face_swapper", "face_enhancer"] else: roop.globals.frame_processors = ["face_swapper"] roop.globals.headless = True roop.globals.keep_fps = True roop.globals.keep_audio = True roop.globals.keep_frames = False roop.globals.many_faces = True roop.globals.video_encoder = "libx264" roop.globals.video_quality = 18 roop.globals.max_memory = suggest_max_memory() roop.globals.execution_providers = decode_execution_providers(["cuda"]) roop.globals.execution_threads = suggest_execution_threads() print( "start process", roop.globals.source_path, roop.globals.target_path, roop.globals.output_path, ) for frame_processor in get_frame_processors_modules( roop.globals.frame_processors ): if not frame_processor.pre_check(): return start() return output_path app = gr.Blocks() with app: gr.Interface( fn=swap_face, inputs=[gr.Image(), gr.Image(), gr.Checkbox(label="face_enhancer?", info="do face enhancer?"), gr.Textbox(visible=False), gr.Textbox(visible=False)], outputs="image" ) app.queue(max_size=5,status_update_rate=5.0) app.launch(max_threads=1)