nsfwalex commited on
Commit
821c6ac
1 Parent(s): 01aecab

Create app.fs.py

Browse files
Files changed (1) hide show
  1. app.fs.py +133 -0
app.fs.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import gradio as gr
3
+ import roop.globals
4
+ from roop.core import (
5
+ start,
6
+ decode_execution_providers,
7
+ suggest_max_memory,
8
+ suggest_execution_threads,
9
+ )
10
+ from roop.processors.frame.core import get_frame_processors_modules
11
+ from roop.utilities import normalize_output_path
12
+ import os
13
+ from PIL import Image
14
+
15
+ from cryptography.hazmat.primitives.asymmetric import rsa, padding
16
+ from cryptography.hazmat.primitives import serialization, hashes
17
+ from cryptography.hazmat.backends import default_backend
18
+ from cryptography.hazmat.primitives.asymmetric import utils
19
+ import base64
20
+ import json
21
+ import datetime
22
+ import pylru
23
+ token_queue = pylru.lrucache(1000)
24
+ def load_public_key_from_file(file_path):
25
+ with open(file_path, "rb") as key_file:
26
+ public_key = serialization.load_pem_public_key(
27
+ key_file.read(),
28
+ backend=default_backend()
29
+ )
30
+ return public_key
31
+
32
+ def verify_signature(public_key, data, signature):
33
+ """
34
+ Verify a signature with a public key.
35
+ Converts the data to bytes if it's not already in byte format.
36
+ """
37
+ # Ensure the data is in bytes. If it's a string, encode it to UTF-8.
38
+ if isinstance(data, str):
39
+ data = data.encode('utf-8')
40
+
41
+ try:
42
+ # Verify the signature
43
+ public_key.verify(
44
+ signature,
45
+ data,
46
+ padding.PSS(
47
+ mgf=padding.MGF1(hashes.SHA256()),
48
+ salt_length=padding.PSS.MAX_LENGTH
49
+ ),
50
+ hashes.SHA256()
51
+ )
52
+ return True
53
+ except Exception as e:
54
+ print("Verification failed:", e)
55
+ return False
56
+
57
+ public_key = load_public_key_from_file("./nsfwais.pubkey.pem")
58
+
59
+ def swap_face(source_file, target_file, doFaceEnhancer, skey, key):
60
+ skey = json.loads(skey)
61
+ #first validate skey
62
+ signature = base64.b64decode(skey["s"])
63
+ if not skey or not key or not verify_signature(public_key, skey["t"] + "_" + key, signature):
64
+ raise Exception("verify authkey failed.")
65
+ timestamp_requested = float(skey["t"])
66
+ timestamp_now = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp())
67
+ if timestamp_now - timestamp_requested > 600:
68
+ raise Exception(f"authkey timeout, {timestamp_now - timestamp_requested}")
69
+ print(f"authkey pass, {timestamp_now - timestamp_requested}")
70
+
71
+ token = skey["t"]
72
+ token_cache = token_queue.get(token, None)
73
+ if token_cache:
74
+ raise Exception(f"duplicated token, {token_cache}")
75
+ token_queue[token] = True
76
+ source_path = "input.jpg"
77
+ target_path = "target.jpg"
78
+
79
+ source_image = Image.fromarray(source_file)
80
+ source_image.save(source_path)
81
+ target_image = Image.fromarray(target_file)
82
+ target_image.save(target_path)
83
+
84
+ print("source_path: ", source_path)
85
+ print("target_path: ", target_path)
86
+
87
+ roop.globals.source_path = source_path
88
+ roop.globals.target_path = target_path
89
+ output_path = "output.jpg"
90
+ roop.globals.output_path = normalize_output_path(
91
+ roop.globals.source_path, roop.globals.target_path, output_path
92
+ )
93
+ if doFaceEnhancer:
94
+ roop.globals.frame_processors = ["face_swapper", "face_enhancer"]
95
+ else:
96
+ roop.globals.frame_processors = ["face_swapper"]
97
+ roop.globals.headless = True
98
+ roop.globals.keep_fps = True
99
+ roop.globals.keep_audio = True
100
+ roop.globals.keep_frames = False
101
+ roop.globals.many_faces = True
102
+ roop.globals.video_encoder = "libx264"
103
+ roop.globals.video_quality = 18
104
+ roop.globals.max_memory = suggest_max_memory()
105
+ roop.globals.execution_providers = decode_execution_providers(["cuda"])
106
+ roop.globals.execution_threads = suggest_execution_threads()
107
+
108
+ print(
109
+ "start process",
110
+ roop.globals.source_path,
111
+ roop.globals.target_path,
112
+ roop.globals.output_path,
113
+ )
114
+
115
+ for frame_processor in get_frame_processors_modules(
116
+ roop.globals.frame_processors
117
+ ):
118
+ if not frame_processor.pre_check():
119
+ return
120
+
121
+ start()
122
+ return output_path
123
+
124
+ app = gr.Blocks()
125
+
126
+ with app:
127
+ gr.Interface(
128
+ fn=swap_face,
129
+ inputs=[gr.Image(), gr.Image(), gr.Checkbox(label="face_enhancer?", info="do face enhancer?"), gr.Textbox(visible=False), gr.Textbox(visible=False)],
130
+ outputs="image"
131
+ )
132
+ app.queue(max_size=5,status_update_rate=5.0)
133
+ app.launch(max_threads=1)