Spaces:
Running
Running
File size: 6,715 Bytes
e25fed3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import os
import re
import subprocess
import torch
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
import platform
class VideoConverter:
def __init__(self):
# Check the operating system
if platform.system() == 'Windows':
# Windows-specific FFmpeg path
self.ffmpeg_path = os.path.join(os.getcwd(), "ffmpeg", "bin", "ffmpeg.exe")
else:
# On Linux, assume FFmpeg is installed and available in PATH
self.ffmpeg_path = "ffmpeg"
self.gpu_available = torch.cuda.is_available()
self.formats = ["MP4", "MKV", "AVI", "MOV", "WMV", "FLV", "MPEG"]
self.codecs = [
"H.264",
"HEVC (H.265)",
"MPEG-4 (Part 2)",
"MPEG-2",
"ProRes Proxy",
"ProRes Light",
"ProRes Standard",
"ProRes HQ"
]
def get_codec_params(self, codec: str) -> List[str]:
codec_params = {
"H.264": ["-c:v", "libx264", "-preset", "medium"],
"HEVC (H.265)": ["-c:v", "libx265", "-preset", "medium"],
"MPEG-4 (Part 2)": ["-c:v", "mpeg4"],
"MPEG-2": ["-c:v", "mpeg2video"],
"ProRes Proxy": ["-c:v", "prores_ks", "-profile:v", "0"],
"ProRes Light": ["-c:v", "prores_ks", "-profile:v", "1"],
"ProRes Standard": ["-c:v", "prores_ks", "-profile:v", "2"],
"ProRes HQ": ["-c:v", "prores_ks", "-profile:v", "3"]
}
return codec_params.get(codec, ["-c:v", "libx264", "-preset", "medium"])
def get_video_duration(self, input_path: str) -> float:
cmd = [
self.ffmpeg_path,
"-i",
input_path
]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
for line in output.split('\n'):
if "Duration" in line:
time_str = line.split("Duration: ")[1].split(",")[0]
h, m, s = time_str.split(':')
return float(h) * 3600 + float(m) * 60 + float(s)
except Exception as e:
print(f"Error getting video duration: {e}")
return 0
def convert_video(
self,
input_path: str,
output_format: str,
codec: str,
output_dir: str,
progress_callback: Optional[callable] = None,
output_filename: Optional[str] = None,
output_resolution: str = "Same as input",
output_bitrate: str = "auto",
output_fps: str = "Same as input"
) -> Tuple[bool, str]:
try:
if not os.path.exists(input_path):
return False, f"Input file does not exist: {Path(input_path).name}"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = os.path.join(output_dir, output_filename)
# Base FFmpeg command
cmd = [
self.ffmpeg_path,
"-y"
]
# Add GPU acceleration if available
if self.gpu_available:
if codec in ["H.264", "HEVC (H.265)"]:
cmd.extend(["-hwaccel", "cuda"])
# On Linux, you might need to specify the correct hardware encoder
cmd.extend(["-c:v", "h264_nvenc"]) # For NVIDIA GPUs
# Add input file
cmd.extend([
"-i", input_path
])
# Add codec parameters
cmd.extend(self.get_codec_params(codec))
# Add scaling if necessary
if output_resolution != "Same as input":
resolution_map = {
"3840x2160 (4K)": "3840:2160",
"2560x1440 (1440p)": "2560:1440",
"1920x1080 (1080p)": "1920:1080",
"1280x720 (720p)": "1280:720",
"854x480 (480p)": "854:480"
}
scale = resolution_map.get(output_resolution)
if scale:
cmd.extend(["-vf", f"scale={scale}"])
# Add FPS if specified
if output_fps != "Same as input":
cmd.extend(["-r", output_fps])
# Add bitrate or CRF settings
if output_bitrate.lower() != "auto":
cmd.extend(["-b:v", output_bitrate])
else:
# Use a default CRF value for quality
cmd.extend(["-crf", "23"])
# Optionally, you can add audio codec settings
# cmd.extend(["-c:a", "aac", "-b:a", "128k"])
# Add output file
cmd.append(output_path)
# Print the FFmpeg command for debugging
print("FFmpeg command:", ' '.join(cmd))
duration = self.get_video_duration(input_path)
print(f"Video duration: {duration} seconds")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine stderr into stdout
universal_newlines=True
)
while True:
line = process.stdout.readline()
if not line:
if process.poll() is not None:
break
continue
# Print the line for debugging
print("FFmpeg output:", line.strip())
if "frame=" in line and progress_callback and duration > 0:
# Extract time from the output
match = re.search(r'time=(\d+:\d+:\d+\.\d+)', line)
if match:
time_str = match.group(1)
h, m, s = time_str.split(':')
current_time = float(h) * 3600 + float(m) * 60 + float(s)
progress = current_time / duration
progress_callback(progress)
process.wait()
if process.returncode == 0:
return True, f"Successfully converted: {output_filename}"
else:
error_output = process.stdout.read()
return False, f"Error converting {Path(input_path).name}: FFmpeg error code {process.returncode}\n{error_output}"
except Exception as e:
return False, f"Error converting {Path(input_path).name}: {str(e)}"
|