Spaces:
Sleeping
Sleeping
import glob | |
import json | |
import os | |
import shutil | |
import string | |
import subprocess | |
import sys | |
from pathlib import Path | |
from typing import Iterable, List, Optional, Tuple, Union | |
import numpy as np | |
from detrsmpl.utils.path_utils import check_input_path, prepare_output_path | |
try: | |
from typing import Literal | |
except ImportError: | |
from typing_extensions import Literal | |
class video_writer: | |
def __init__(self, | |
output_path: str, | |
resolution: Iterable[int], | |
fps: float = 30.0, | |
num_frame: int = 1e9, | |
disable_log: bool = False) -> None: | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
height, width = resolution | |
width += width % 2 | |
height += height % 2 | |
command = [ | |
'ffmpeg', | |
'-y', # (optional) overwrite output file if it exists | |
'-f', | |
'rawvideo', | |
'-pix_fmt', | |
'bgr24', | |
'-s', | |
f'{int(width)}x{int(height)}', | |
'-r', | |
f'{fps}', # frames per second | |
'-loglevel', | |
'error', | |
'-threads', | |
'1', | |
'-i', | |
'-', # The input comes from a pipe | |
'-vcodec', | |
'libx264', | |
'-r', | |
f'{fps}', # frames per second | |
'-an', # Tells FFMPEG not to expect any audio | |
output_path, | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
process = subprocess.Popen( | |
command, | |
stdin=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
if process.stdin is None or process.stderr is None: | |
raise BrokenPipeError('No buffer received.') | |
self.process = process | |
self.num_frame = num_frame | |
self.len = 0 | |
def write(self, image_array: np.ndarray): | |
if self.len <= self.num_frame: | |
try: | |
self.process.stdin.write(image_array.tobytes()) | |
self.len += 1 | |
except KeyboardInterrupt: | |
self.__del__() | |
def __del__(self): | |
self.process.stdin.close() | |
self.process.stderr.close() | |
self.process.wait() | |
def array_to_video( | |
image_array: np.ndarray, | |
output_path: str, | |
fps: Union[int, float] = 30, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
disable_log: bool = False, | |
) -> None: | |
"""Convert an array to a video directly, gif not supported. | |
Args: | |
image_array (np.ndarray): shape should be (f * h * w * 3). | |
output_path (str): output video file path. | |
fps (Union[int, float, optional): fps. Defaults to 30. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of the output video. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check output path. | |
TypeError: check input array. | |
Returns: | |
None. | |
""" | |
if not isinstance(image_array, np.ndarray): | |
raise TypeError('Input should be np.ndarray.') | |
assert image_array.ndim == 4 | |
assert image_array.shape[-1] == 3 | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
if resolution: | |
height, width = resolution | |
width += width % 2 | |
height += height % 2 | |
else: | |
image_array = pad_for_libx264(image_array) | |
height, width = image_array.shape[1], image_array.shape[2] | |
command = [ | |
'ffmpeg', | |
'-y', # (optional) overwrite output file if it exists | |
'-f', | |
'rawvideo', | |
'-s', | |
f'{int(width)}x{int(height)}', # size of one frame | |
'-pix_fmt', | |
'bgr24', | |
'-r', | |
f'{fps}', # frames per second | |
'-loglevel', | |
'error', | |
'-threads', | |
'4', | |
'' | |
'-i', | |
'-', # The input comes from a pipe | |
'-vcodec', | |
'libx264', | |
'-an', # Tells FFMPEG not to expect any audio | |
output_path, | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
process = subprocess.Popen( | |
command, | |
stdin=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
if process.stdin is None or process.stderr is None: | |
raise BrokenPipeError('No buffer received.') | |
index = 0 | |
while True: | |
if index >= image_array.shape[0]: | |
break | |
process.stdin.write(image_array[index].tobytes()) | |
index += 1 | |
process.stdin.close() | |
process.stderr.close() | |
process.wait() | |
def array_to_images( | |
image_array: np.ndarray, | |
output_folder: str, | |
img_format: str = '%06d.png', | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
disable_log: bool = False, | |
) -> None: | |
"""Convert an array to images directly. | |
Args: | |
image_array (np.ndarray): shape should be (f * h * w * 3). | |
output_folder (str): output folder for the images. | |
img_format (str, optional): format of the images. | |
Defaults to '%06d.png'. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): resolution(height, width) of output. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check output folder. | |
TypeError: check input array. | |
Returns: | |
None | |
""" | |
prepare_output_path( | |
output_folder, | |
allowed_suffix=[], | |
tag='output image folder', | |
path_type='dir', | |
overwrite=True) | |
if not isinstance(image_array, np.ndarray): | |
raise TypeError('Input should be np.ndarray.') | |
assert image_array.ndim == 4 | |
assert image_array.shape[-1] == 3 | |
if resolution: | |
height, width = resolution | |
else: | |
height, width = image_array.shape[1], image_array.shape[2] | |
command = [ | |
'ffmpeg', | |
'-y', # (optional) overwrite output file if it exists | |
'-f', | |
'rawvideo', | |
'-s', | |
f'{int(width)}x{int(height)}', # size of one frame | |
'-pix_fmt', | |
'bgr24', # bgr24 for matching OpenCV | |
'-loglevel', | |
'error', | |
'-threads', | |
'4', | |
'-i', | |
'-', # The input comes from a pipe | |
'-f', | |
'image2', | |
'-start_number', | |
'0', | |
os.path.join(output_folder, img_format), | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
process = subprocess.Popen( | |
command, | |
stdin=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
bufsize=10**8, | |
close_fds=True) | |
if process.stdin is None or process.stderr is None: | |
raise BrokenPipeError('No buffer received.') | |
index = 0 | |
while True: | |
if index >= image_array.shape[0]: | |
break | |
process.stdin.write(image_array[index].tobytes()) | |
index += 1 | |
process.stdin.close() | |
process.stderr.close() | |
process.wait() | |
def video_to_array( | |
input_path: str, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
start: int = 0, | |
end: Optional[int] = None, | |
disable_log: bool = False, | |
) -> np.ndarray: | |
""" | |
Read a video/gif as an array of (f * h * w * 3). | |
Args: | |
input_path (str): input path. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): resolution(height, width) of output. | |
Defaults to None. | |
start (int, optional): start frame index. Inclusive. | |
If < 0, will be converted to frame_index range in [0, frame_num]. | |
Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
Returns: | |
np.ndarray: shape will be (f * h * w * 3). | |
""" | |
check_input_path( | |
input_path, | |
allowed_suffix=['.mp4', 'mkv', 'avi', '.gif'], | |
tag='input video', | |
path_type='file') | |
info = vid_info_reader(input_path) | |
if resolution: | |
height, width = resolution | |
else: | |
width, height = int(info['width']), int(info['height']) | |
num_frames = int(info['nb_frames']) | |
start = (min(start, num_frames - 1) + num_frames) % num_frames | |
end = (min(end, num_frames - 1) + | |
num_frames) % num_frames if end is not None else num_frames | |
command = [ | |
'ffmpeg', | |
'-i', | |
input_path, | |
'-filter_complex', | |
f'[0]trim=start_frame={start}:end_frame={end}[v0]', | |
'-map', | |
'[v0]', | |
'-pix_fmt', | |
'bgr24', # bgr24 for matching OpenCV | |
'-s', | |
f'{int(width)}x{int(height)}', | |
'-f', | |
'image2pipe', | |
'-vcodec', | |
'rawvideo', | |
'-loglevel', | |
'error', | |
'pipe:' | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
# Execute FFmpeg as sub-process with stdout as a pipe | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8) | |
if process.stdout is None: | |
raise BrokenPipeError('No buffer received.') | |
# Read decoded video frames from the PIPE until no more frames to read | |
array = [] | |
while True: | |
# Read decoded video frame (in raw video format) from stdout process. | |
buffer = process.stdout.read(int(width * height * 3)) | |
# Break the loop if buffer length is not W*H*3\ | |
# (when FFmpeg streaming ends). | |
if len(buffer) != width * height * 3: | |
break | |
img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3) | |
array.append(img[np.newaxis]) | |
process.stdout.flush() | |
process.stdout.close() | |
process.wait() | |
return np.concatenate(array) | |
def images_to_sorted_images(input_folder, output_folder, img_format='%06d'): | |
"""Copy and rename a folder of images into a new folder following the | |
`img_format`. | |
Args: | |
input_folder (str): input folder. | |
output_folder (str): output folder. | |
img_format (str, optional): image format name, do not need extension. | |
Defaults to '%06d'. | |
Returns: | |
str: image format of the rename images. | |
""" | |
img_format = img_format.rsplit('.', 1)[0] | |
file_list = [] | |
os.makedirs(output_folder, exist_ok=True) | |
pngs = glob.glob(os.path.join(input_folder, '*.png')) | |
if pngs: | |
ext = 'png' | |
file_list.extend(pngs) | |
jpgs = glob.glob(os.path.join(input_folder, '*.jpg')) | |
if jpgs: | |
ext = 'jpg' | |
file_list.extend(jpgs) | |
file_list.sort() | |
for index, file_name in enumerate(file_list): | |
shutil.copy( | |
file_name, | |
os.path.join(output_folder, (img_format + '.%s') % (index, ext))) | |
return img_format + '.%s' % ext | |
def images_to_array( | |
input_folder: str, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
img_format: str = '%06d.png', | |
start: int = 0, | |
end: Optional[int] = None, | |
remove_raw_files: bool = False, | |
disable_log: bool = False, | |
) -> np.ndarray: | |
""" | |
Read a folder of images as an array of (f * h * w * 3). | |
Args: | |
input_folder (str): folder of input images. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]]: | |
resolution(height, width) of output. Defaults to None. | |
img_format (str, optional): format of images to be read. | |
Defaults to '%06d.png'. | |
start (int, optional): start frame index. Inclusive. | |
If < 0, will be converted to frame_index range in [0, frame_num]. | |
Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
remove_raw_files (bool, optional): whether remove raw images. | |
Defaults to False. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
Returns: | |
np.ndarray: shape will be (f * h * w * 3). | |
""" | |
check_input_path( | |
input_folder, | |
allowed_suffix=[''], | |
tag='input image folder', | |
path_type='dir') | |
input_folderinfo = Path(input_folder) | |
temp_input_folder = None | |
if img_format is None: | |
temp_input_folder = os.path.join(input_folderinfo.parent, | |
input_folderinfo.name + '_temp') | |
img_format = images_to_sorted_images( | |
input_folder=input_folder, output_folder=temp_input_folder) | |
input_folder = temp_input_folder | |
info = vid_info_reader(f'{input_folder}/{img_format}' % start) | |
width, height = int(info['width']), int(info['height']) | |
if resolution: | |
height, width = resolution | |
else: | |
width, height = int(info['width']), int(info['height']) | |
num_frames = len(os.listdir(input_folder)) | |
start = max(start, 0) % num_frames | |
end = min(end, num_frames) % (num_frames + 1) \ | |
if end is not None else num_frames | |
command = [ | |
'ffmpeg', | |
'-y', | |
'-threads', | |
'1', | |
'-start_number', | |
f'{start}', | |
'-i', | |
f'{input_folder}/{img_format}', | |
'-frames:v', | |
f'{end - start}', | |
'-f', | |
'rawvideo', | |
'-pix_fmt', | |
'bgr24', # bgr24 for matching OpenCV | |
'-s', | |
f'{int(width)}x{int(height)}', | |
'-loglevel', | |
'error', | |
'-' | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8) | |
if process.stdout is None: | |
raise BrokenPipeError('No buffer received.') | |
# Read decoded video frames from the PIPE until no more frames to read | |
array = [] | |
while True: | |
# Read decoded video frame (in raw video format) from stdout process. | |
buffer = process.stdout.read(int(width * height * 3)) | |
# Break the loop if buffer length is not W*H*3\ | |
# (when FFmpeg streaming ends). | |
if len(buffer) != width * height * 3: | |
break | |
img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3) | |
array.append(img[np.newaxis]) | |
process.stdout.flush() | |
process.stdout.close() | |
process.wait() | |
if temp_input_folder is not None: | |
if Path(temp_input_folder).is_dir(): | |
shutil.rmtree(temp_input_folder) | |
if remove_raw_files: | |
if Path(input_folder).is_dir(): | |
shutil.rmtree(input_folder) | |
return np.concatenate(array) | |
class vid_info_reader(object): | |
def __init__(self, input_path) -> None: | |
"""Get video information from video, mimiced from ffmpeg-python. | |
https://github.com/kkroening/ffmpeg-python. | |
Args: | |
vid_file ([str]): video file path. | |
Raises: | |
FileNotFoundError: check the input path. | |
Returns: | |
None. | |
""" | |
check_input_path( | |
input_path, | |
allowed_suffix=['.mp4', '.gif', '.png', '.jpg', '.jpeg'], | |
tag='input file', | |
path_type='file') | |
cmd = [ | |
'ffprobe', '-show_format', '-show_streams', '-of', 'json', | |
input_path | |
] | |
process = subprocess.Popen( | |
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out, _ = process.communicate() | |
probe = json.loads(out.decode('utf-8')) | |
video_stream = next((stream for stream in probe['streams'] | |
if stream['codec_type'] == 'video'), None) | |
if video_stream is None: | |
print('No video stream found', file=sys.stderr) | |
sys.exit(1) | |
self.video_stream = video_stream | |
def __getitem__( | |
self, | |
key: Literal['index', 'codec_name', 'codec_long_name', 'profile', | |
'codec_type', 'codec_time_base', 'codec_tag_string', | |
'codec_tag', 'width', 'height', 'coded_width', | |
'coded_height', 'has_b_frames', 'pix_fmt', 'level', | |
'chroma_location', 'refs', 'is_avc', 'nal_length_size', | |
'r_frame_rate', 'avg_frame_rate', 'time_base', | |
'start_pts', 'start_time', 'duration_ts', 'duration', | |
'bit_rate', 'bits_per_raw_sample', 'nb_frames', | |
'disposition', 'tags']): | |
"""Key (str): select in ['index', 'codec_name', 'codec_long_name', | |
'profile', 'codec_type', 'codec_time_base', 'codec_tag_string', | |
'codec_tag', 'width', 'height', 'coded_width', 'coded_height', | |
'has_b_frames', 'pix_fmt', 'level', 'chroma_location', 'refs', | |
'is_avc', 'nal_length_size', 'r_frame_rate', 'avg_frame_rate', | |
'time_base', 'start_pts', 'start_time', 'duration_ts', 'duration', | |
'bit_rate', 'bits_per_raw_sample', 'nb_frames', 'disposition', | |
'tags']""" | |
return self.video_stream[key] | |
def video_to_gif( | |
input_path: str, | |
output_path: str, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
fps: Union[float, int] = 15, | |
disable_log: bool = False, | |
) -> None: | |
"""Convert a video to a gif file. | |
Args: | |
input_path (str): video file path. | |
output_path (str): gif file path. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of the output video. | |
Defaults to None. | |
fps (Union[float, int], optional): frames per second. Defaults to 15. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None. | |
""" | |
check_input_path( | |
input_path, | |
allowed_suffix=['.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif'], | |
tag='output gif', | |
path_type='file', | |
overwrite=True) | |
info = vid_info_reader(input_path) | |
duration = info['duration'] | |
if resolution: | |
height, width = resolution | |
else: | |
width, height = int(info['width']), int(info['height']) | |
command = [ | |
'ffmpeg', '-r', | |
str(info['r_frame_rate']), '-i', input_path, '-r', f'{fps}', '-s', | |
f'{width}x{height}', '-loglevel', 'error', '-t', f'{duration}', | |
'-threads', '4', '-y', output_path | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
def video_to_images(input_path: str, | |
output_folder: str, | |
resolution: Optional[Union[Tuple[int, int], | |
Tuple[float, float]]] = None, | |
img_format: str = '%06d.png', | |
start: int = 0, | |
end: Optional[int] = None, | |
disable_log: bool = False) -> None: | |
"""Convert a video to a folder of images. | |
Args: | |
input_path (str): video file path | |
output_folder (str): output folder to store the images | |
resolution (Optional[Tuple[int, int]], optional): | |
(height, width) of output. defaults to None. | |
img_format (str, optional): format of images to be read. | |
Defaults to '%06d.png'. | |
start (int, optional): start frame index. Inclusive. | |
If < 0, will be converted to frame_index range in [0, frame_num]. | |
Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path | |
FileNotFoundError: check the output path | |
Returns: | |
None | |
""" | |
check_input_path( | |
input_path, | |
allowed_suffix=['.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_folder, | |
allowed_suffix=[], | |
tag='output image folder', | |
path_type='dir', | |
overwrite=True) | |
info = vid_info_reader(input_path) | |
num_frames = int(info['nb_frames']) | |
start = (min(start, num_frames - 1) + num_frames) % num_frames | |
end = (min(end, num_frames - 1) + | |
num_frames) % num_frames if end is not None else num_frames | |
command = [ | |
'ffmpeg', '-i', input_path, '-filter_complex', | |
f'[0]trim=start_frame={start}:end_frame={end}[v0]', '-map', '[v0]', | |
'-f', 'image2', '-v', 'error', '-start_number', '0', '-threads', '1', | |
f'{output_folder}/{img_format}' | |
] | |
if resolution: | |
height, width = resolution | |
command.insert(3, '-s') | |
command.insert(4, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
def images_to_video(input_folder: str, | |
output_path: str, | |
remove_raw_file: bool = False, | |
img_format: str = '%06d.png', | |
fps: Union[int, float] = 30, | |
resolution: Optional[Union[Tuple[int, int], | |
Tuple[float, float]]] = None, | |
start: int = 0, | |
end: Optional[int] = None, | |
disable_log: bool = False) -> None: | |
"""Convert a folder of images to a video. | |
Args: | |
input_folder (str): input image folder | |
output_path (str): output video file path | |
remove_raw_file (bool, optional): whether remove raw images. | |
Defaults to False. | |
img_format (str, optional): format to name the images]. | |
Defaults to '%06d.png'. | |
fps (Union[int, float], optional): output video fps. Defaults to 30. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. | |
defaults to None. | |
start (int, optional): start frame index. Inclusive. | |
If < 0, will be converted to frame_index range in [0, frame_num]. | |
Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None | |
""" | |
check_input_path( | |
input_folder, | |
allowed_suffix=[], | |
tag='input image folder', | |
path_type='dir') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
input_folderinfo = Path(input_folder) | |
num_frames = len(os.listdir(input_folder)) | |
start = (min(start, num_frames - 1) + num_frames) % num_frames | |
end = (min(end, num_frames - 1) + | |
num_frames) % num_frames if end is not None else num_frames | |
temp_input_folder = None | |
if img_format is None: | |
temp_input_folder = os.path.join(input_folderinfo.parent, | |
input_folderinfo.name + '_temp') | |
img_format = images_to_sorted_images(input_folder, temp_input_folder) | |
command = [ | |
'ffmpeg', | |
'-y', | |
'-threads', | |
'4', | |
'-start_number', | |
f'{start}', | |
'-r', | |
f'{fps}', | |
'-i', | |
f'{input_folder}/{img_format}' | |
if temp_input_folder is None else f'{temp_input_folder}/{img_format}', | |
'-frames:v', | |
f'{end - start}', | |
'-profile:v', | |
'baseline', | |
'-level', | |
'3.0', | |
'-c:v', | |
'libx264', | |
'-pix_fmt', | |
'yuv420p', | |
'-vf', | |
'scale=trunc(iw/2)*2:trunc(ih/2)*2', # Ensure width and height are divisible by 2 | |
'-an', | |
'-v', | |
'error', | |
'-loglevel', | |
'error', | |
output_path, | |
] | |
if resolution: | |
height, width = resolution | |
width += width % 2 | |
height += height % 2 | |
command.insert(1, '-s') | |
command.insert(2, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if remove_raw_file: | |
if Path(input_folder).is_dir(): | |
shutil.rmtree(input_folder) | |
if temp_input_folder is not None: | |
if Path(temp_input_folder).is_dir(): | |
shutil.rmtree(temp_input_folder) | |
def images_to_gif( | |
input_folder: str, | |
output_path: str, | |
remove_raw_file: bool = False, | |
img_format: str = '%06d.png', | |
fps: int = 15, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
start: int = 0, | |
end: Optional[int] = None, | |
disable_log: bool = False, | |
) -> None: | |
"""Convert series of images to a video, similar to images_to_video, but | |
provide more suitable parameters. | |
Args: | |
input_folder (str): input image folder. | |
output_path (str): output gif file path. | |
remove_raw_file (bool, optional): whether remove raw images. | |
Defaults to False. | |
img_format (str, optional): format to name the images. | |
Defaults to '%06d.png'. | |
fps (int, optional): output video fps. Defaults to 15. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. Defaults to None. | |
start (int, optional): start frame index. Inclusive. | |
If < 0, will be converted to frame_index range in [0, frame_num]. | |
Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None | |
""" | |
input_folderinfo = Path(input_folder) | |
check_input_path( | |
input_folder, | |
allowed_suffix=[], | |
tag='input image folder', | |
path_type='dir') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif'], | |
tag='output gif', | |
path_type='file', | |
overwrite=True) | |
num_frames = len(os.listdir(input_folder)) | |
start = (min(start, num_frames - 1) + num_frames) % num_frames | |
end = (min(end, num_frames - 1) + | |
num_frames) % num_frames if end is not None else num_frames | |
temp_input_folder = None | |
if img_format is None: | |
file_list = [] | |
temp_input_folder = os.path.join(input_folderinfo.parent, | |
input_folderinfo.name + '_temp') | |
os.makedirs(temp_input_folder, exist_ok=True) | |
pngs = glob.glob(os.path.join(input_folder, '*.png')) | |
ext = 'png' | |
if pngs: | |
ext = 'png' | |
file_list.extend(pngs) | |
jpgs = glob.glob(os.path.join(input_folder, '*.jpg')) | |
if jpgs: | |
ext = 'jpg' | |
file_list.extend(jpgs) | |
file_list.sort() | |
for index, file_name in enumerate(file_list): | |
shutil.copy( | |
file_name, | |
os.path.join(temp_input_folder, '%06d.%s' % (index + 1, ext))) | |
input_folder = temp_input_folder | |
img_format = '%06d.' + ext | |
command = [ | |
'ffmpeg', | |
'-y', | |
'-threads', | |
'4', | |
'-start_number', | |
f'{start}', | |
'-r', | |
f'{fps}', | |
'-i', | |
f'{input_folder}/{img_format}', | |
'-frames:v', | |
f'{end - start}', | |
'-loglevel', | |
'error', | |
'-v', | |
'error', | |
output_path, | |
] | |
if resolution: | |
height, width = resolution | |
command.insert(1, '-s') | |
command.insert(2, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if remove_raw_file: | |
shutil.rmtree(input_folder) | |
if temp_input_folder is not None: | |
shutil.rmtree(temp_input_folder) | |
def gif_to_video(input_path: str, | |
output_path: str, | |
fps: int = 30, | |
remove_raw_file: bool = False, | |
resolution: Optional[Union[Tuple[int, int], | |
Tuple[float, float]]] = None, | |
disable_log: bool = False) -> None: | |
"""Convert a gif file to a video. | |
Args: | |
input_path (str): input gif file path. | |
output_path (str): output video file path. | |
fps (int, optional): fps. Defaults to 30. | |
remove_raw_file (bool, optional): whether remove original input file. | |
Defaults to False. | |
down_sample_scale (Union[int, float], optional): down sample scale. | |
Defaults to 1. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None | |
""" | |
check_input_path( | |
input_path, allowed_suffix=['.gif'], tag='input gif', path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
command = [ | |
'ffmpeg', '-i', input_path, '-r', f'{fps}', '-loglevel', 'error', '-y', | |
output_path, '-threads', '4' | |
] | |
if resolution: | |
height, width = resolution | |
command.insert(3, '-s') | |
command.insert(4, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if remove_raw_file: | |
subprocess.call(['rm', '-f', input_path]) | |
def gif_to_images(input_path: str, | |
output_folder: str, | |
fps: int = 30, | |
img_format: str = '%06d.png', | |
resolution: Optional[Union[Tuple[int, int], | |
Tuple[float, float]]] = None, | |
disable_log: bool = False) -> None: | |
"""Convert a gif file to a folder of images. | |
Args: | |
input_path (str): input gif file path. | |
output_folder (str): output folder to save the images. | |
fps (int, optional): fps. Defaults to 30. | |
img_format (str, optional): output image name format. | |
Defaults to '%06d.png'. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. | |
Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None | |
""" | |
check_input_path( | |
input_path, allowed_suffix=['.gif'], tag='input gif', path_type='file') | |
prepare_output_path( | |
output_folder, | |
allowed_suffix=[], | |
tag='output image folder', | |
path_type='dir', | |
overwrite=True) | |
command = [ | |
'ffmpeg', '-r', f'{fps}', '-i', input_path, '-loglevel', 'error', '-f', | |
'image2', '-v', 'error', '-threads', '4', '-y', '-start_number', '0', | |
f'{output_folder}/{img_format}' | |
] | |
if resolution: | |
height, width = resolution | |
command.insert(3, '-s') | |
command.insert(4, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
def crop_video( | |
input_path: str, | |
output_path: str, | |
box: Optional[Union[List[int], Tuple[int, int, int, int]]] = None, | |
resolution: Optional[Union[Tuple[int, int], Tuple[float, float]]] = None, | |
disable_log: bool = False, | |
) -> None: | |
"""Spatially or temporally crop a video or gif file. | |
Args: | |
input_path (str): input video or gif file path. | |
output_path (str): output video or gif file path. | |
box (Iterable[int], optional): [x, y of the crop region left. | |
corner and width and height]. Defaults to [0, 0, 100, 100]. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None'-start_number', f'{start}', | |
""" | |
check_input_path( | |
input_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
info = vid_info_reader(input_path) | |
width, height = int(info['width']), int(info['height']) | |
if box is None: | |
box = [0, 0, width, height] | |
assert len(box) == 4 | |
x, y, w, h = box | |
assert (w > 0 and h > 0) | |
command = [ | |
'ffmpeg', '-i', input_path, '-vcodec', 'libx264', '-vf', | |
'crop=%d:%d:%d:%d' % (w, h, x, y), '-loglevel', 'error', '-y', | |
output_path | |
] | |
if resolution: | |
height, width = resolution | |
width += width % 2 | |
height += height % 2 | |
command.insert(-1, '-s') | |
command.insert(-1, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
def slice_video(input_path: str, | |
output_path: str, | |
start: int = 0, | |
end: Optional[int] = None, | |
resolution: Optional[Union[Tuple[int, int], | |
Tuple[float, float]]] = None, | |
disable_log: bool = False) -> None: | |
"""Temporally crop a video/gif into another video/gif. | |
Args: | |
input_path (str): input video or gif file path. | |
output_path (str): output video of gif file path. | |
start (int, optional): start frame index. Defaults to 0. | |
end (int, optional): end frame index. Exclusive. | |
Could be positive int or negative int or None. | |
If None, all frames from start till the last frame are included. | |
Defaults to None. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. Defaults to None. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
NoReturn | |
""" | |
info = vid_info_reader(input_path) | |
num_frames = int(info['nb_frames']) | |
start = (min(start, num_frames - 1) + num_frames) % num_frames | |
end = (min(end, num_frames - 1) + | |
num_frames) % num_frames if end is not None else num_frames | |
command = [ | |
'ffmpeg', '-y', '-i', input_path, '-filter_complex', | |
f'[0]trim=start_frame={start}:end_frame={end}[v0]', '-map', '[v0]', | |
'-loglevel', 'error', '-vcodec', 'libx264', output_path | |
] | |
if resolution: | |
height, width = resolution | |
width += width % 2 | |
height += height % 2 | |
command.insert(1, '-s') | |
command.insert(2, '%dx%d' % (width, height)) | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
def spatial_concat_video(input_path_list: List[str], | |
output_path: str, | |
array: List[int] = [1, 1], | |
direction: Literal['h', 'w'] = 'h', | |
resolution: Union[Tuple[int, | |
int], List[int], List[float], | |
Tuple[float, float]] = (512, 512), | |
remove_raw_files: bool = False, | |
padding: int = 0, | |
disable_log: bool = False) -> None: | |
"""Spatially concat some videos as an array video. | |
Args: | |
input_path_list (list): input video or gif file list. | |
output_path (str): output video or gif file path. | |
array (List[int], optional): line number and column number of | |
the video array]. Defaults to [1, 1]. | |
direction (str, optional): [choose in 'h' or 'v', represent | |
horizontal and vertical separately]. | |
Defaults to 'h'. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], | |
optional): (height, width) of output. | |
Defaults to (512, 512). | |
remove_raw_files (bool, optional): whether remove raw images. | |
Defaults to False. | |
padding (int, optional): width of pixels between videos. | |
Defaults to 0. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None | |
""" | |
lowercase = string.ascii_lowercase | |
assert len(array) == 2 | |
assert (array[0] * array[1]) >= len(input_path_list) | |
for path in input_path_list: | |
check_input_path( | |
path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
command = ['ffmpeg'] | |
height, width = resolution | |
scale_command = [] | |
for index, vid_file in enumerate(input_path_list): | |
command.append('-i') | |
command.append(vid_file) | |
scale_command.append( | |
'[%d:v]scale=%d:%d:force_original_aspect_ratio=0[v%d];' % | |
(index, width, height, index)) | |
scale_command = ' '.join(scale_command) | |
pad_command = '[v%d]pad=%d:%d[%s];' % (0, width * array[1] + padding * | |
(array[1] - 1), | |
height * array[0] + padding * | |
(array[0] - 1), lowercase[0]) | |
for index in range(1, len(input_path_list)): | |
if direction == 'h': | |
pad_width = index % array[1] * (width + padding) | |
pad_height = index // array[1] * (height + padding) | |
else: | |
pad_width = index % array[0] * (width + padding) | |
pad_height = index // array[0] * (height + padding) | |
pad_command += '[%s][v%d]overlay=%d:%d' % (lowercase[index - 1], index, | |
pad_width, pad_height) | |
if index != len(input_path_list) - 1: | |
pad_command += '[%s];' % lowercase[index] | |
command += [ | |
'-filter_complex', | |
'%s%s' % (scale_command, pad_command), '-loglevel', 'error', '-y', | |
output_path | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if remove_raw_files: | |
command = ['rm', '-f'] + input_path_list | |
subprocess.call(command) | |
def temporal_concat_video(input_path_list: List[str], | |
output_path: str, | |
resolution: Union[Tuple[int, int], | |
Tuple[float, float]] = (512, 512), | |
remove_raw_files: bool = False, | |
disable_log: bool = False) -> None: | |
"""Concat no matter videos or gifs into a temporal sequence, and save as a | |
new video or gif file. | |
Args: | |
input_path_list (List[str]): list of input video paths. | |
output_path (str): output video file path. | |
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]] | |
, optional): (height, width) of output]. | |
Defaults to (512,512). | |
remove_raw_files (bool, optional): whether remove the input videos. | |
Defaults to False. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None. | |
""" | |
for path in input_path_list: | |
check_input_path( | |
path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
height, width = resolution | |
command = ['ffmpeg'] | |
concat_command = [] | |
scale_command = [] | |
for index, vid_file in enumerate(input_path_list): | |
command.append('-i') | |
command.append(vid_file) | |
scale_command.append( | |
'[%d:v]scale=%d:%d:force_original_aspect_ratio=0[v%d];' % | |
(index, width, height, index)) | |
concat_command.append('[v%d]' % index) | |
concat_command = ''.join(concat_command) | |
scale_command = ''.join(scale_command) | |
command += [ | |
'-filter_complex', | |
'%s%sconcat=n=%d:v=1:a=0[v]' % | |
(scale_command, concat_command, len(input_path_list)), '-loglevel', | |
'error', '-map', '[v]', '-c:v', 'libx264', '-y', output_path | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if remove_raw_files: | |
command = ['rm'] + input_path_list | |
subprocess.call(command) | |
def compress_video(input_path: str, | |
output_path: str, | |
compress_rate: int = 1, | |
down_sample_scale: Union[float, int] = 1, | |
fps: int = 30, | |
disable_log: bool = False) -> None: | |
"""Compress a video file. | |
Args: | |
input_path (str): input video file path. | |
output_path (str): output video file path. | |
compress_rate (int, optional): compress rate, influents the bit rate. | |
Defaults to 1. | |
down_sample_scale (Union[float, int], optional): spatial down sample | |
scale. Defaults to 1. | |
fps (int, optional): Frames per second. Defaults to 30. | |
disable_log (bool, optional): whether close the ffmepg command info. | |
Defaults to False. | |
Raises: | |
FileNotFoundError: check the input path. | |
FileNotFoundError: check the output path. | |
Returns: | |
None. | |
""" | |
input_pathinfo = Path(input_path) | |
check_input_path( | |
input_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='input video', | |
path_type='file') | |
prepare_output_path( | |
output_path, | |
allowed_suffix=['.gif', '.mp4'], | |
tag='output video', | |
path_type='file', | |
overwrite=True) | |
info = vid_info_reader(input_path) | |
width = int(info['width']) | |
height = int(info['height']) | |
bit_rate = int(info['bit_rate']) | |
duration = float(info['duration']) | |
if (output_path == input_path) or (not output_path): | |
temp_outpath = os.path.join( | |
os.path.abspath(input_pathinfo.parent), | |
'temp_file' + input_pathinfo.suffix) | |
else: | |
temp_outpath = output_path | |
new_width = int(width / down_sample_scale) | |
new_width += new_width % 2 | |
new_height = int(height / down_sample_scale) | |
new_height += new_height % 2 | |
command = [ | |
'ffmpeg', '-y', '-r', | |
str(info['r_frame_rate']), '-i', input_path, '-loglevel', 'error', | |
'-b:v', f'{bit_rate / (compress_rate * down_sample_scale)}', '-r', | |
f'{fps}', '-t', f'{duration}', '-s', | |
'%dx%d' % (new_width, new_height), temp_outpath | |
] | |
if not disable_log: | |
print(f'Running \"{" ".join(command)}\"') | |
subprocess.call(command) | |
if (output_path == input_path) or (not output_path): | |
subprocess.call(['mv', '-f', temp_outpath, input_path]) | |
def pad_for_libx264(image_array): | |
"""Pad zeros if width or height of image_array is not divisible by 2. | |
Otherwise you will get. | |
\"[libx264 @ 0x1b1d560] width not divisible by 2 \" | |
Args: | |
image_array (np.ndarray): | |
Image or images load by cv2.imread(). | |
Possible shapes: | |
1. [height, width] | |
2. [height, width, channels] | |
3. [images, height, width] | |
4. [images, height, width, channels] | |
Returns: | |
np.ndarray: | |
A image with both edges divisible by 2. | |
""" | |
if image_array.ndim == 2 or \ | |
(image_array.ndim == 3 and image_array.shape[2] == 3): | |
hei_index = 0 | |
wid_index = 1 | |
elif image_array.ndim == 4 or \ | |
(image_array.ndim == 3 and image_array.shape[2] != 3): | |
hei_index = 1 | |
wid_index = 2 | |
else: | |
return image_array | |
hei_pad = image_array.shape[hei_index] % 2 | |
wid_pad = image_array.shape[wid_index] % 2 | |
if hei_pad + wid_pad > 0: | |
pad_width = [] | |
for dim_index in range(image_array.ndim): | |
if dim_index == hei_index: | |
pad_width.append((0, hei_pad)) | |
elif dim_index == wid_index: | |
pad_width.append((0, wid_pad)) | |
else: | |
pad_width.append((0, 0)) | |
values = 0 | |
image_array = \ | |
np.pad(image_array, | |
pad_width, | |
mode='constant', constant_values=values) | |
return image_array | |