Spaces:
Running
on
Zero
Running
on
Zero
import onnxruntime as ort | |
from huggingface_hub import hf_hub_download | |
import requests | |
import os | |
import gradio as gr | |
import spaces | |
from typing import Any, List, Callable | |
import cv2 | |
import insightface | |
import time | |
import tempfile | |
import subprocess | |
import gfpgan | |
print("instaling cudnn 9") | |
import subprocess | |
import sys | |
# Set the LD_LIBRARY_PATH environment variable | |
os.environ['LD_LIBRARY_PATH'] = '/usr/local/cuda-12.0/lib64:' + os.environ.get('LD_LIBRARY_PATH', '') | |
def get_pip_version(package_name): | |
try: | |
result = subprocess.run( | |
[sys.executable, '-m', 'pip', 'show', package_name], | |
capture_output=True, | |
text=True, | |
check=True | |
) | |
output = result.stdout | |
version_line = next(line for line in output.split('\n') if line.startswith('Version:')) | |
return version_line.split(': ')[1] | |
except subprocess.CalledProcessError as e: | |
print(f"Erro ao executar o comando: {e}") | |
return None | |
package_name = 'nvidia-cudnn-cu12' | |
version = get_pip_version(package_name) | |
print(f"A versão instalada de {package_name} é: {version}") | |
command = "find / -path /proc -prune -o -path /sys -prune -o -name 'libcudnn*' -print" | |
process = subprocess.run(command, shell=True, text=True, capture_output=True) | |
if process.returncode == 0: | |
print("Resultados da busca:\n", process.stdout) | |
else: | |
print("Houve um erro na execução do comando:", process.stderr) | |
def find_and_move_library(library_name, destination): | |
# Search for the library in the entire system | |
try: | |
command = f"find / -name '{library_name}' 2>/dev/null" | |
process = subprocess.run(command, shell=True, text=True, capture_output=True) | |
if process.returncode == 0: | |
found_paths = process.stdout.strip().split('\n') | |
if found_paths: | |
for path in found_paths: | |
print(f"Found {library_name} at: {path}") | |
# Move the library to the destination | |
try: | |
subprocess.run(['mv', path, destination], check=True) | |
print(f"Moved {library_name} to {destination}") | |
break # Exit after moving the first valid match | |
except subprocess.CalledProcessError as e: | |
print(f"Failed to move {path} to {destination}: {e}") | |
else: | |
print(f"No {library_name} found.") | |
else: | |
print("Error during search:", process.stderr) | |
except Exception as e: | |
print(f"Error finding {library_name}: {e}") | |
source_path = '/usr/local/lib/python3.10/site-packages/nvidia/cublas/lib/libcublasLt.so.12' | |
destination_path = '/usr/local/lib/python3.10/site-packages/nvidia/cudnn/lib/' | |
command = ['mv', source_path, destination_path] | |
subprocess.run(command, check=True) | |
find_and_move_library('libnvrtc.so.12', destination_path) | |
command = ['mv', "/usr/local/lib/python3.10/site-packages/nvidia/cublas/lib/libcublas.so.12", destination_path] | |
subprocess.run(command, check=True) | |
command = ['mv', "/usr/local/lib/python3.10/site-packages/nvidia/cufft/lib/libcufft.so.11", destination_path] | |
subprocess.run(command, check=True) | |
command = ['mv', "/usr/local/lib/python3.10/site-packages/nvidia/cufft/lib/libcufftw.so.11", destination_path] | |
subprocess.run(command, check=True) | |
command = ['mv', "/usr/local/lib/python3.10/site-packages/nvidia/cuda_runtime/lib/libcudart.so.12", destination_path] | |
subprocess.run(command, check=True) | |
command = ['mv', "/usr/local/lib/python3.10/site-packages/nvidia/cuda_cupti/lib/libcupti.so.12", destination_path] | |
subprocess.run(command, check=True) | |
command = ['cp', "/usr/local/lib/python3.10/site-packages/nvidia/curand/lib/libcurand.so.10", destination_path] | |
subprocess.run(command, check=True) | |
command = ['cp', "/usr/local/lib/python3.10/site-packages/nvidia/cusolver/lib/libcusolver.so.11", destination_path] | |
subprocess.run(command, check=True) | |
command = ['cp', "/usr/local/lib/python3.10/site-packages/nvidia/cusolver/lib/libcusolverMg.so.11", destination_path] | |
subprocess.run(command, check=True) | |
command = ['cp', "/usr/local/lib/python3.10/site-packages/nvidia/cusparse/lib/libcusparse.so.12", destination_path] | |
subprocess.run(command, check=True) | |
command = "find / -path /proc -prune -o -path /sys -prune -o -name 'libcu*' -print" | |
process = subprocess.run(command, shell=True, text=True, capture_output=True) | |
if process.returncode == 0: | |
print("Resultados da busca:\n", process.stdout) | |
else: | |
print("Houve um erro na execução do comando:", process.stderr) | |
print("done") | |
print("---------------------") | |
print(ort.get_available_providers()) | |
def conditional_download(download_directory_path, urls): | |
if not os.path.exists(download_directory_path): | |
os.makedirs(download_directory_path) | |
for url in urls: | |
filename = url.split('/')[-1] | |
file_path = os.path.join(download_directory_path, filename) | |
if not os.path.exists(file_path): | |
print(f"Baixando {filename}...") | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(file_path, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=8192): | |
file.write(chunk) | |
print(f"{filename} baixado com sucesso.") | |
else: | |
print(f"Falha ao baixar {filename}. Status code: {response.status_code}") | |
else: | |
print(f"{filename} já existe. Pulando o download.") | |
model_path = hf_hub_download(repo_id="countfloyd/deepfake", filename="inswapper_128.onnx") | |
conditional_download("./", ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth']) | |
FACE_SWAPPER = None | |
FACE_ANALYSER = None | |
FACE_ENHANCER = None | |
def process_video(source_path: str, target_path: str, enhance = False, progress=gr.Progress(), output_path='result.mp4') -> None: | |
def get_face_analyser(): | |
global FACE_ANALYSER | |
if FACE_ANALYSER is None: | |
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=["CUDAExecutionProvider"]) | |
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) | |
return FACE_ANALYSER | |
def get_face_enhancer() -> Any: | |
global FACE_ENHANCER | |
if FACE_ENHANCER is None: | |
FACE_ENHANCER = gfpgan.GFPGANer(model_path="./GFPGANv1.4.pth", upscale=2, ) # type: ignore[attr-defined] | |
return FACE_ENHANCER | |
def get_one_face(frame): | |
face = get_face_analyser().get(frame) | |
try: | |
return min(face, key=lambda x: x.bbox[0]) | |
except ValueError: | |
return None | |
def get_face_swapper(): | |
global FACE_SWAPPER | |
if FACE_SWAPPER is None: | |
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=["CUDAExecutionProvider"]) | |
return FACE_SWAPPER | |
def swap_face(source_face, target_face, temp_frame): | |
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True) | |
def process_frame(source_face, temp_frame, enhance): | |
target_face = get_one_face(temp_frame) | |
if target_face: | |
temp_frame = swap_face(source_face, target_face, temp_frame) | |
if enhance: | |
temp_frame = enhance_face(temp_frame) | |
return temp_frame | |
def process_image(source_path: str, target_path: str, output_path: str, enhance = False) -> None: | |
source_face = get_one_face(cv2.imread(source_path)) | |
target_frame = cv2.imread(target_path) | |
result = process_frame(source_face, target_frame, enhance) | |
cv2.imwrite(output_path, result) | |
def create_temp_directory(): | |
temp_dir = tempfile.mkdtemp() | |
return temp_dir | |
def enhance_face(temp_frame): | |
_, _, temp_frame = get_face_enhancer().enhance( | |
temp_frame, | |
paste_back=True | |
) | |
return temp_frame | |
def remove_temp_directory(temp_dir): | |
try: | |
for filename in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, filename) | |
if os.path.isfile(file_path): | |
os.unlink(file_path) | |
elif os.path.isdir(file_path): | |
os.rmdir(file_path) | |
os.rmdir(temp_dir) | |
except Exception as e: | |
print(f"Erro ao remover a pasta temporária: {e}") | |
def extract_frames(video_path: str): | |
video_capture = cv2.VideoCapture(video_path) | |
if not video_capture.isOpened(): | |
print("Erro ao abrir o vídeo.") | |
return [] | |
frames = [] | |
while True: | |
ret, frame = video_capture.read() | |
if not ret: | |
break | |
frames.append(frame) | |
video_capture.release() | |
return frames | |
def get_video_fps(video_path: str) -> float: | |
video_capture = cv2.VideoCapture(video_path) | |
if not video_capture.isOpened(): | |
raise ValueError("Erro ao abrir o vídeo.") | |
fps = video_capture.get(cv2.CAP_PROP_FPS) | |
video_capture.release() | |
return fps | |
def create_video_from_frames(temp_dir: str, output_video_path: str, fps: float) -> None: | |
temp_frames_pattern = os.path.join(temp_dir, "frame_%04d.jpg") | |
ffmpeg_command = [ | |
'ffmpeg', | |
'-y', | |
'-framerate', str(fps), | |
'-i', temp_frames_pattern, | |
'-c:v', 'libx264', | |
'-pix_fmt', 'yuv420p', | |
'-preset', 'ultrafast', | |
output_video_path | |
] | |
subprocess.run(ffmpeg_command, check=True) | |
def extract_audio(video_path: str, audio_path: str) -> None: | |
ffmpeg_command = [ | |
'ffmpeg', | |
'-y', | |
'-i', video_path, | |
'-q:a', '0', | |
'-map', 'a', | |
'-preset', 'ultrafast', | |
audio_path | |
] | |
subprocess.run(ffmpeg_command, check=True) | |
def add_audio_to_video(video_path: str, audio_path: str, output_video_path: str) -> None: | |
ffmpeg_command = [ | |
'ffmpeg', | |
'-y', | |
'-i', video_path, | |
'-i', audio_path, | |
'-c:v', 'copy', | |
'-c:a', 'aac', | |
'-strict', 'experimental', | |
'-preset', 'ultrafast', | |
output_video_path | |
] | |
subprocess.run(ffmpeg_command, check=True) | |
def delete_file(file_path: str) -> None: | |
try: | |
os.remove(file_path) | |
except Exception as e: | |
print(f"Erro ao remover o arquivo: {e}") | |
def reduce_video(video_path: str, output_video_path: str) -> None: | |
ffmpeg_command = [ | |
'ffmpeg', | |
'-y', | |
'-i', video_path, | |
'-vf', "scale='if(gte(iw,ih),720,-1)':'if(gte(iw,ih),-1,720)',pad=ceil(iw/2)*2:ceil(ih/2)*2", | |
'-preset', 'ultrafast', | |
'-r', '24', | |
output_video_path | |
] | |
subprocess.run(ffmpeg_command, check=True) | |
if not target_path.endswith('.mp4') and not target_path.endswith('.mov') and not target_path.endswith('.avi'): | |
process_image(source_path, target_path, "./output.jpg", enhance) | |
return "./output.jpg" | |
temp_dir = create_temp_directory() | |
video_input = temp_dir + "/input.mp4" | |
reduce_video(target_path , video_input) | |
source_face = get_one_face(cv2.imread(source_path)) | |
frames = extract_frames(video_input) | |
for index, frame in progress.tqdm(enumerate(frames), total=len(frames)): | |
processed_frame = process_frame(source_face, frame, enhance) | |
frame_filename = os.path.join(temp_dir, f"frame_{index:04d}.jpg") | |
cv2.imwrite(frame_filename, processed_frame) | |
video_path = temp_dir + "/out.mp4" | |
create_video_from_frames(temp_dir, video_path, get_video_fps(video_input)) | |
audio_path = temp_dir + "/audio.wav" | |
extract_audio(video_input, audio_path) | |
add_audio_to_video(video_path, audio_path, output_path) | |
remove_temp_directory(temp_dir) | |
return output_path | |
app = gr.Interface( | |
fn=process_video, | |
inputs=[gr.Image(type='filepath'), gr.File(label="Upload Image or Video", file_types=["image", "video"]), gr.Checkbox(label="Use Face GAN(increase render time)", value=False)], | |
outputs=[gr.File()], | |
description="Videos get downsampled to 720p and 24fps. To modify the code or purchase a modification, send an email to [email protected] to donate to the owner of the space: https://donate.stripe.com/3csg0D0tadXU4mYcMM" | |
) | |
app.launch() |