roop-unleashed / roop /ProcessMgr.py
basemnassar07's picture
Upload folder using huggingface_hub
9efad20 verified
import os
import cv2
import numpy as np
import psutil
from enum import Enum
from roop.ProcessOptions import ProcessOptions
from roop.face_util import get_first_face, get_all_faces, rotate_image_180, rotate_anticlockwise, rotate_clockwise, clamp_cut_values
from roop.utilities import compute_cosine_distance, get_device, str_to_class
import roop.vr_util as vr
from typing import Any, List, Callable
from roop.typing import Frame, Face
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread, Lock
from queue import Queue
from tqdm import tqdm
from roop.ffmpeg_writer import FFMPEG_VideoWriter
import roop.globals
# Poor man's enum to be able to compare to int
class eNoFaceAction():
USE_ORIGINAL_FRAME = 0
RETRY_ROTATED = 1
SKIP_FRAME = 2
SKIP_FRAME_IF_DISSIMILAR = 3
def create_queue(temp_frame_paths: List[str]) -> Queue[str]:
queue: Queue[str] = Queue()
for frame_path in temp_frame_paths:
queue.put(frame_path)
return queue
def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]:
queues = []
for _ in range(queue_per_future):
if not queue.empty():
queues.append(queue.get())
return queues
class ProcessMgr():
input_face_datas = []
target_face_datas = []
imagemask = None
processors = []
options : ProcessOptions = None
num_threads = 1
current_index = 0
processing_threads = 1
buffer_wait_time = 0.1
lock = Lock()
frames_queue = None
processed_queue = None
videowriter= None
progress_gradio = None
total_frames = 0
plugins = {
'faceswap' : 'FaceSwapInsightFace',
'mask_clip2seg' : 'Mask_Clip2Seg',
'mask_xseg' : 'Mask_XSeg',
'codeformer' : 'Enhance_CodeFormer',
'gfpgan' : 'Enhance_GFPGAN',
'dmdnet' : 'Enhance_DMDNet',
'gpen' : 'Enhance_GPEN',
'restoreformer++' : 'Enhance_RestoreFormerPPlus',
'colorizer' : 'Frame_Colorizer',
'filter_generic' : 'Frame_Filter',
'removebg' : 'Frame_Masking',
'upscale' : 'Frame_Upscale'
}
def __init__(self, progress):
if progress is not None:
self.progress_gradio = progress
def reuseOldProcessor(self, name:str):
for p in self.processors:
if p.processorname == name:
return p
return None
def initialize(self, input_faces, target_faces, options):
self.input_face_datas = input_faces
self.target_face_datas = target_faces
self.options = options
devicename = get_device()
roop.globals.g_desired_face_analysis=["landmark_3d_68", "landmark_2d_106","detection","recognition"]
if options.swap_mode == "all_female" or options.swap_mode == "all_male":
roop.globals.g_desired_face_analysis.append("genderage")
for p in self.processors:
newp = next((x for x in options.processors.keys() if x == p.processorname), None)
if newp is None:
p.Release()
del p
newprocessors = []
for key, extoption in options.processors.items():
p = self.reuseOldProcessor(key)
if p is None:
classname = self.plugins[key]
module = 'roop.processors.' + classname
p = str_to_class(module, classname)
if p is not None:
extoption.update({"devicename": devicename})
p.Initialize(extoption)
newprocessors.append(p)
else:
print(f"Not using {module}")
self.processors = newprocessors
if isinstance(self.options.imagemask, dict) and self.options.imagemask.get("layers") and len(self.options.imagemask["layers"]) > 0:
self.options.imagemask = self.options.imagemask.get("layers")[0]
# Get rid of alpha
self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_RGBA2GRAY)
if np.any(self.options.imagemask):
mo = self.input_face_datas[0].faces[0].mask_offsets
self.options.imagemask = self.blur_area(self.options.imagemask, mo[4], mo[5])
self.options.imagemask = self.options.imagemask.astype(np.float32) / 255
self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_GRAY2RGB)
else:
self.options.imagemask = None
self.options.frame_processing = False
for p in self.processors:
if p.type.startswith("frame_"):
self.options.frame_processing = True
def run_batch(self, source_files, target_files, threads:int = 1):
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
self.total_frames = len(source_files)
self.num_threads = threads
with tqdm(total=self.total_frames, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = []
queue = create_queue(source_files)
queue_per_future = max(len(source_files) // threads, 1)
while not queue.empty():
future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress))
futures.append(future)
for future in as_completed(futures):
future.result()
def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None:
for f in current_files:
if not roop.globals.processing:
return
# Decode the byte array into an OpenCV image
temp_frame = cv2.imdecode(np.fromfile(f, dtype=np.uint8), cv2.IMREAD_COLOR)
if temp_frame is not None:
if self.options.frame_processing:
for p in self.processors:
frame = p.Run(temp_frame)
resimg = frame
else:
resimg = self.process_frame(temp_frame)
if resimg is not None:
i = source_files.index(f)
cv2.imwrite(target_files[i], resimg)
if update:
update()
def read_frames_thread(self, cap, frame_start, frame_end, num_threads):
num_frame = 0
total_num = frame_end - frame_start
if frame_start > 0:
cap.set(cv2.CAP_PROP_POS_FRAMES,frame_start)
while True and roop.globals.processing:
ret, frame = cap.read()
if not ret:
break
self.frames_queue[num_frame % num_threads].put(frame, block=True)
num_frame += 1
if num_frame == total_num:
break
for i in range(num_threads):
self.frames_queue[i].put(None)
def process_videoframes(self, threadindex, progress) -> None:
while True:
frame = self.frames_queue[threadindex].get()
if frame is None:
self.processing_threads -= 1
self.processed_queue[threadindex].put((False, None))
return
else:
if self.options.frame_processing:
for p in self.processors:
frame = p.Run(frame)
resimg = frame
else:
resimg = self.process_frame(frame)
self.processed_queue[threadindex].put((True, resimg))
del frame
progress()
def write_frames_thread(self):
nextindex = 0
num_producers = self.num_threads
while True:
process, frame = self.processed_queue[nextindex % self.num_threads].get()
nextindex += 1
if frame is not None:
self.videowriter.write_frame(frame)
del frame
elif process == False:
num_producers -= 1
if num_producers < 1:
return
def run_batch_inmem(self, source_video, target_video, frame_start, frame_end, fps, threads:int = 1, skip_audio=False):
cap = cv2.VideoCapture(source_video)
# frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count = (frame_end - frame_start) + 1
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
processed_resolution = None
for p in self.processors:
if hasattr(p, 'getProcessedResolution'):
processed_resolution = p.getProcessedResolution(width, height)
print(f"Processed resolution: {processed_resolution}")
if processed_resolution is not None:
width = processed_resolution[0]
height = processed_resolution[1]
self.total_frames = frame_count
self.num_threads = threads
self.processing_threads = self.num_threads
self.frames_queue = []
self.processed_queue = []
for _ in range(threads):
self.frames_queue.append(Queue(1))
self.processed_queue.append(Queue(1))
self.videowriter = FFMPEG_VideoWriter(target_video, (width, height), fps, codec=roop.globals.video_encoder, crf=roop.globals.video_quality, audiofile=None)
readthread = Thread(target=self.read_frames_thread, args=(cap, frame_start, frame_end, threads))
readthread.start()
writethread = Thread(target=self.write_frames_thread)
writethread.start()
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
with tqdm(total=self.total_frames, desc='Processing', unit='frames', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
with ThreadPoolExecutor(thread_name_prefix='swap_proc', max_workers=self.num_threads) as executor:
futures = []
for threadindex in range(threads):
future = executor.submit(self.process_videoframes, threadindex, lambda: self.update_progress(progress))
futures.append(future)
for future in as_completed(futures):
future.result()
# wait for the task to complete
readthread.join()
writethread.join()
cap.release()
self.videowriter.close()
self.frames_queue.clear()
self.processed_queue.clear()
def update_progress(self, progress: Any = None) -> None:
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1024 / 1024 / 1024
progress.set_postfix({
'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB',
'execution_threads': self.num_threads
})
progress.update(1)
if self.progress_gradio is not None:
self.progress_gradio((progress.n, self.total_frames), desc='Processing', total=self.total_frames, unit='frames')
# https://github.com/deepinsight/insightface#third-party-re-implementation-of-arcface
# https://github.com/deepinsight/insightface/blob/master/alignment/coordinate_reg/image_infer.py
# https://github.com/deepinsight/insightface/issues/1350
# https://github.com/linghu8812/tensorrt_inference
def process_frame(self, frame:Frame):
if len(self.input_face_datas) < 1 and not self.options.show_face_masking:
return frame
temp_frame = frame.copy()
num_swapped, temp_frame = self.swap_faces(frame, temp_frame)
if num_swapped > 0:
if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME_IF_DISSIMILAR:
if len(self.input_face_datas) > num_swapped:
return None
return temp_frame
if roop.globals.no_face_action == eNoFaceAction.USE_ORIGINAL_FRAME:
return frame
if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME:
#This only works with in-mem processing, as it simply skips the frame.
#For 'extract frames' it simply leaves the unprocessed frame unprocessed and it gets used in the final output by ffmpeg.
#If we could delete that frame here, that'd work but that might cause ffmpeg to fail unless the frames are renamed, and I don't think we have the info on what frame it actually is?????
#alternatively, it could mark all the necessary frames for deletion, delete them at the end, then rename the remaining frames that might work?
return None
else:
return self.retry_rotated(frame)
def retry_rotated(self, frame):
copyframe = frame.copy()
copyframe = rotate_clockwise(copyframe)
temp_frame = copyframe.copy()
num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame)
if num_swapped > 0:
return rotate_anticlockwise(temp_frame)
copyframe = frame.copy()
copyframe = rotate_anticlockwise(copyframe)
temp_frame = copyframe.copy()
num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame)
if num_swapped > 0:
return rotate_clockwise(temp_frame)
del copyframe
return frame
def swap_faces(self, frame, temp_frame):
num_faces_found = 0
if self.options.swap_mode == "first":
face = get_first_face(frame)
if face is None:
return num_faces_found, frame
num_faces_found += 1
temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
else:
faces = get_all_faces(frame)
if faces is None:
return num_faces_found, frame
if self.options.swap_mode == "all":
for face in faces:
num_faces_found += 1
temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
del face
elif self.options.swap_mode == "selected":
num_targetfaces = len(self.target_face_datas)
use_index = num_targetfaces == 1
for i,tf in enumerate(self.target_face_datas):
for face in faces:
if compute_cosine_distance(tf.embedding, face.embedding) <= self.options.face_distance_threshold:
if i < len(self.input_face_datas):
if use_index:
temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
else:
temp_frame = self.process_face(i, face, temp_frame)
num_faces_found += 1
del face
if not roop.globals.vr_mode and num_faces_found == num_targetfaces:
break
elif self.options.swap_mode == "all_female" or self.options.swap_mode == "all_male":
gender = 'F' if self.options.swap_mode == "all_female" else 'M'
for face in faces:
if face.sex == gender:
num_faces_found += 1
temp_frame = self.process_face(self.options.selected_index, face, temp_frame)
del face
if roop.globals.vr_mode and num_faces_found % 2 > 0:
# stereo image, there has to be an even number of faces
num_faces_found = 0
return num_faces_found, frame
if num_faces_found == 0:
return num_faces_found, frame
#maskprocessor = next((x for x in self.processors if x.type == 'mask'), None)
if self.options.imagemask is not None and self.options.imagemask.shape == frame.shape:
temp_frame = self.simple_blend_with_mask(temp_frame, frame, self.options.imagemask)
return num_faces_found, temp_frame
def rotation_action(self, original_face:Face, frame:Frame):
(height, width) = frame.shape[:2]
bounding_box_width = original_face.bbox[2] - original_face.bbox[0]
bounding_box_height = original_face.bbox[3] - original_face.bbox[1]
horizontal_face = bounding_box_width > bounding_box_height
center_x = width // 2.0
start_x = original_face.bbox[0]
end_x = original_face.bbox[2]
bbox_center_x = start_x + (bounding_box_width // 2.0)
# need to leverage the array of landmarks as decribed here:
# https://github.com/deepinsight/insightface/tree/master/alignment/coordinate_reg
# basically, we should be able to check for the relative position of eyes and nose
# then use that to determine which way the face is actually facing when in a horizontal position
# and use that to determine the correct rotation_action
forehead_x = original_face.landmark_2d_106[72][0]
chin_x = original_face.landmark_2d_106[0][0]
if horizontal_face:
if chin_x < forehead_x:
# this is someone lying down with their face like this (:
return "rotate_anticlockwise"
elif forehead_x < chin_x:
# this is someone lying down with their face like this :)
return "rotate_clockwise"
if bbox_center_x >= center_x:
# this is someone lying down with their face in the right hand side of the frame
return "rotate_anticlockwise"
if bbox_center_x < center_x:
# this is someone lying down with their face in the left hand side of the frame
return "rotate_clockwise"
return None
def auto_rotate_frame(self, original_face, frame:Frame):
target_face = original_face
original_frame = frame
rotation_action = self.rotation_action(original_face, frame)
if rotation_action == "rotate_anticlockwise":
#face is horizontal, rotating frame anti-clockwise and getting face bounding box from rotated frame
frame = rotate_anticlockwise(frame)
elif rotation_action == "rotate_clockwise":
#face is horizontal, rotating frame clockwise and getting face bounding box from rotated frame
frame = rotate_clockwise(frame)
return target_face, frame, rotation_action
def auto_unrotate_frame(self, frame:Frame, rotation_action):
if rotation_action == "rotate_anticlockwise":
return rotate_clockwise(frame)
elif rotation_action == "rotate_clockwise":
return rotate_anticlockwise(frame)
return frame
def process_face(self,face_index, target_face:Face, frame:Frame):
from roop.face_util import align_crop
enhanced_frame = None
if(len(self.input_face_datas) > 0):
inputface = self.input_face_datas[face_index].faces[0]
else:
inputface = None
rotation_action = None
if roop.globals.autorotate_faces:
# check for sideways rotation of face
rotation_action = self.rotation_action(target_face, frame)
if rotation_action is not None:
(startX, startY, endX, endY) = target_face["bbox"].astype("int")
width = endX - startX
height = endY - startY
offs = int(max(width,height) * 0.25)
rotcutframe,startX, startY, endX, endY = self.cutout(frame, startX - offs, startY - offs, endX + offs, endY + offs)
if rotation_action == "rotate_anticlockwise":
rotcutframe = rotate_anticlockwise(rotcutframe)
elif rotation_action == "rotate_clockwise":
rotcutframe = rotate_clockwise(rotcutframe)
# rotate image and re-detect face to correct wonky landmarks
rotface = get_first_face(rotcutframe)
if rotface is None:
rotation_action = None
else:
saved_frame = frame.copy()
frame = rotcutframe
target_face = rotface
# if roop.globals.vr_mode:
# bbox = target_face.bbox
# [orig_width, orig_height, _] = frame.shape
# # Convert bounding box to ints
# x1, y1, x2, y2 = map(int, bbox)
# # Determine the center of the bounding box
# x_center = (x1 + x2) / 2
# y_center = (y1 + y2) / 2
# # Normalize coordinates to range [-1, 1]
# x_center_normalized = x_center / (orig_width / 2) - 1
# y_center_normalized = y_center / (orig_width / 2) - 1
# # Convert normalized coordinates to spherical (theta, phi)
# theta = x_center_normalized * 180 # Theta ranges from -180 to 180 degrees
# phi = -y_center_normalized * 90 # Phi ranges from -90 to 90 degrees
# img = vr.GetPerspective(frame, 90, theta, phi, 1280, 1280) # Generate perspective image
fake_frame = None
aligned_img, M = align_crop(frame, target_face.kps, 128)
fake_frame = aligned_img
swap_frame = aligned_img
target_face.matrix = M
for p in self.processors:
if p.type == 'swap':
if inputface is not None:
for _ in range(0,self.options.num_swap_steps):
swap_frame = p.Run(inputface, target_face, swap_frame)
fake_frame = swap_frame
scale_factor = 0.0
elif p.type == 'mask':
fake_frame = self.process_mask(p, aligned_img, fake_frame)
else:
enhanced_frame, scale_factor = p.Run(self.input_face_datas[face_index], target_face, fake_frame)
upscale = 512
orig_width = fake_frame.shape[1]
fake_frame = cv2.resize(fake_frame, (upscale, upscale), cv2.INTER_CUBIC)
mask_offsets = (0,0,0,0,1,20) if inputface is None else inputface.mask_offsets
if enhanced_frame is None:
scale_factor = int(upscale / orig_width)
result = self.paste_upscale(fake_frame, fake_frame, target_face.matrix, frame, scale_factor, mask_offsets)
else:
result = self.paste_upscale(fake_frame, enhanced_frame, target_face.matrix, frame, scale_factor, mask_offsets)
if rotation_action is not None:
fake_frame = self.auto_unrotate_frame(result, rotation_action)
return self.paste_simple(fake_frame, saved_frame, startX, startY)
return result
def cutout(self, frame:Frame, start_x, start_y, end_x, end_y):
if start_x < 0:
start_x = 0
if start_y < 0:
start_y = 0
if end_x > frame.shape[1]:
end_x = frame.shape[1]
if end_y > frame.shape[0]:
end_y = frame.shape[0]
return frame[start_y:end_y, start_x:end_x], start_x, start_y, end_x, end_y
def paste_simple(self, src:Frame, dest:Frame, start_x, start_y):
end_x = start_x + src.shape[1]
end_y = start_y + src.shape[0]
start_x, end_x, start_y, end_y = clamp_cut_values(start_x, end_x, start_y, end_y, dest)
dest[start_y:end_y, start_x:end_x] = src
return dest
def simple_blend_with_mask(self, image1, image2, mask):
# Blend the images
blended_image = image1.astype(np.float32) * (1.0 - mask) + image2.astype(np.float32) * mask
return blended_image.astype(np.uint8)
def paste_upscale(self, fake_face, upsk_face, M, target_img, scale_factor, mask_offsets):
M_scale = M * scale_factor
IM = cv2.invertAffineTransform(M_scale)
face_matte = np.full((target_img.shape[0],target_img.shape[1]), 255, dtype=np.uint8)
# Generate white square sized as a upsk_face
img_matte = np.zeros((upsk_face.shape[0],upsk_face.shape[1]), dtype=np.uint8)
w = img_matte.shape[1]
h = img_matte.shape[0]
top = int(mask_offsets[0] * h)
bottom = int(h - (mask_offsets[1] * h))
left = int(mask_offsets[2] * w)
right = int(w - (mask_offsets[3] * w))
img_matte[top:bottom,left:right] = 255
# Transform white square back to target_img
img_matte = cv2.warpAffine(img_matte, IM, (target_img.shape[1], target_img.shape[0]), flags=cv2.INTER_NEAREST, borderValue=0.0)
##Blacken the edges of face_matte by 1 pixels (so the mask in not expanded on the image edges)
img_matte[:1,:] = img_matte[-1:,:] = img_matte[:,:1] = img_matte[:,-1:] = 0
img_matte = self.blur_area(img_matte, mask_offsets[4], mask_offsets[5])
#Normalize images to float values and reshape
img_matte = img_matte.astype(np.float32)/255
face_matte = face_matte.astype(np.float32)/255
img_matte = np.minimum(face_matte, img_matte)
if self.options.show_face_area_overlay:
# Additional steps for green overlay
green_overlay = np.zeros_like(target_img)
green_color = [0, 255, 0] # RGB for green
for i in range(3): # Apply green color where img_matte is not zero
green_overlay[:, :, i] = np.where(img_matte > 0, green_color[i], 0) ##Transform upcaled face back to target_img
img_matte = np.reshape(img_matte, [img_matte.shape[0],img_matte.shape[1],1])
paste_face = cv2.warpAffine(upsk_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE)
if upsk_face is not fake_face:
fake_face = cv2.warpAffine(fake_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE)
paste_face = cv2.addWeighted(paste_face, self.options.blend_ratio, fake_face, 1.0 - self.options.blend_ratio, 0)
# Re-assemble image
paste_face = img_matte * paste_face
paste_face = paste_face + (1-img_matte) * target_img.astype(np.float32)
if self.options.show_face_area_overlay:
# Overlay the green overlay on the final image
paste_face = cv2.addWeighted(paste_face.astype(np.uint8), 1 - 0.5, green_overlay, 0.5, 0)
return paste_face.astype(np.uint8)
def blur_area(self, img_matte, num_erosion_iterations, blur_amount):
# Detect the affine transformed white area
mask_h_inds, mask_w_inds = np.where(img_matte==255)
# Calculate the size (and diagonal size) of transformed white area width and height boundaries
mask_h = np.max(mask_h_inds) - np.min(mask_h_inds)
mask_w = np.max(mask_w_inds) - np.min(mask_w_inds)
mask_size = int(np.sqrt(mask_h*mask_w))
# Calculate the kernel size for eroding img_matte by kernel (insightface empirical guess for best size was max(mask_size//10,10))
# k = max(mask_size//12, 8)
k = max(mask_size//(blur_amount // 2) , blur_amount // 2)
kernel = np.ones((k,k),np.uint8)
img_matte = cv2.erode(img_matte,kernel,iterations = num_erosion_iterations)
#Calculate the kernel size for blurring img_matte by blur_size (insightface empirical guess for best size was max(mask_size//20, 5))
# k = max(mask_size//24, 4)
k = max(mask_size//blur_amount, blur_amount//5)
kernel_size = (k, k)
blur_size = tuple(2*i+1 for i in kernel_size)
return cv2.GaussianBlur(img_matte, blur_size, 0)
def process_mask(self, processor, frame:Frame, target:Frame):
img_mask = processor.Run(frame, self.options.masking_text)
img_mask = cv2.resize(img_mask, (target.shape[1], target.shape[0]))
img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1])
if self.options.show_face_masking:
result = (1 - img_mask) * frame.astype(np.float32)
return np.uint8(result)
target = target.astype(np.float32)
result = (1-img_mask) * target
result += img_mask * frame.astype(np.float32)
return np.uint8(result)
def unload_models():
pass
def release_resources(self):
for p in self.processors:
p.Release()
self.processors.clear()