|
from enum import Enum |
|
from pathlib import Path |
|
from re import split |
|
from typing import Annotated, Optional, Union |
|
|
|
import ffmpeg |
|
from ffmpeg.nodes import FilterNode, InputNode |
|
from torch import Value |
|
|
|
|
|
class VideoCodec(str, Enum): |
|
gif = "gif" |
|
vp9 = "vp9" |
|
webm = "webm" |
|
webp = "webp" |
|
h264 = "h264" |
|
hevc = "hevc" |
|
|
|
|
|
def codec_extn(codec: VideoCodec): |
|
match codec: |
|
case VideoCodec.gif: |
|
return "gif" |
|
case VideoCodec.vp9: |
|
return "webm" |
|
case VideoCodec.webm: |
|
return "webm" |
|
case VideoCodec.webp: |
|
return "webp" |
|
case VideoCodec.h264: |
|
return "mp4" |
|
case VideoCodec.hevc: |
|
return "mp4" |
|
case _: |
|
raise ValueError(f"Unknown codec {codec}") |
|
|
|
|
|
def clamp_gif_fps(fps: int): |
|
"""Clamp FPS to a value that is supported by GIFs. |
|
|
|
GIF frame duration is measured in 1/100ths of a second, so we need to clamp the |
|
FPS to a value that 100 is a factor of. |
|
""" |
|
|
|
if fps > 100: |
|
return 100 |
|
|
|
|
|
if 100 % fps == 0: |
|
return fps |
|
|
|
|
|
match fps: |
|
case x if x > 50: |
|
|
|
|
|
return 50 |
|
case x if x >= 30: |
|
return 33 |
|
case x if x >= 24: |
|
return 25 |
|
case x if x >= 20: |
|
return 20 |
|
case x if x >= 15: |
|
|
|
return 16 |
|
case x if x >= 12: |
|
return 12 |
|
case x if x >= 10: |
|
|
|
return 10 |
|
case x if x >= 6: |
|
|
|
return 6 |
|
case 4: |
|
return 4 |
|
case _: |
|
return 1 |
|
|
|
|
|
class FfmpegEncoder: |
|
def __init__( |
|
self, |
|
frames_dir: Path, |
|
out_file: Path, |
|
codec: VideoCodec, |
|
in_fps: int = 60, |
|
out_fps: int = 60, |
|
lossless: bool = False, |
|
param={}, |
|
): |
|
self.frames_dir = frames_dir |
|
self.out_file = out_file |
|
self.codec = codec |
|
self.in_fps = in_fps |
|
self.out_fps = out_fps |
|
self.lossless = lossless |
|
self.param = param |
|
|
|
self.input: Optional[InputNode] = None |
|
|
|
def encode(self) -> tuple: |
|
self.input: InputNode = ffmpeg.input( |
|
str(self.frames_dir.resolve().joinpath("%08d.png")), framerate=self.in_fps |
|
).filter("fps", fps=self.in_fps) |
|
match self.codec: |
|
case VideoCodec.gif: |
|
return self._encode_gif() |
|
case VideoCodec.webm: |
|
return self._encode_webm() |
|
case VideoCodec.webp: |
|
return self._encode_webp() |
|
case VideoCodec.h264: |
|
return self._encode_h264() |
|
case VideoCodec.hevc: |
|
return self._encode_hevc() |
|
case _: |
|
raise ValueError(f"Unknown codec {self.codec}") |
|
|
|
@property |
|
def _out_file(self) -> Path: |
|
return str(self.out_file.resolve()) |
|
|
|
@staticmethod |
|
def _interpolate(stream, out_fps: int) -> FilterNode: |
|
return stream.filter( |
|
"minterpolate", fps=out_fps, mi_mode="mci", mc_mode="aobmc", me_mode="bidir", vsbmc=1 |
|
) |
|
|
|
def _encode_gif(self) -> tuple: |
|
stream: FilterNode = self.input |
|
|
|
|
|
out_fps = clamp_gif_fps(self.out_fps) |
|
if self.in_fps != out_fps: |
|
stream = self._interpolate(stream, out_fps) |
|
|
|
|
|
split_stream = stream.split() |
|
|
|
|
|
palette = split_stream[0].filter("palettegen") |
|
stream = ffmpeg.filter([split_stream[1], palette], "paletteuse").output( |
|
self._out_file, vcodec="gif", loop=0 |
|
) |
|
return stream.run() |
|
|
|
def _encode_webm(self) -> tuple: |
|
stream: FilterNode = self.input |
|
if self.in_fps != self.out_fps: |
|
stream = self._interpolate(stream, self.out_fps) |
|
param = { |
|
"pix_fmt":"yuv420p", |
|
"vcodec":"libvpx-vp9", |
|
"video_bitrate":0, |
|
"crf":24, |
|
} |
|
param.update(**self.param) |
|
stream = stream.output( |
|
self._out_file, **param |
|
) |
|
return stream.run() |
|
|
|
def _encode_webp(self) -> tuple: |
|
stream: FilterNode = self.input |
|
if self.in_fps != self.out_fps: |
|
stream = self._interpolate(stream, self.out_fps) |
|
|
|
if self.lossless: |
|
param = { |
|
"pix_fmt":"bgra", |
|
"vcodec":"libwebp_anim", |
|
"lossless":1, |
|
"compression_level":5, |
|
"qscale":75, |
|
"loop":0, |
|
} |
|
param.update(**self.param) |
|
stream = stream.output( |
|
self._out_file, |
|
**param |
|
) |
|
else: |
|
param = { |
|
"pix_fmt":"yuv420p", |
|
"vcodec":"libwebp_anim", |
|
"lossless":0, |
|
"compression_level":5, |
|
"qscale":90, |
|
"loop":0, |
|
} |
|
param.update(**self.param) |
|
stream = stream.output( |
|
self._out_file, |
|
**param |
|
) |
|
return stream.run() |
|
|
|
def _encode_h264(self) -> tuple: |
|
stream: FilterNode = self.input |
|
if self.in_fps != self.out_fps: |
|
stream = self._interpolate(stream, self.out_fps) |
|
|
|
param = { |
|
"pix_fmt":"yuv420p", |
|
"vcodec":"libx264", |
|
"crf":21, |
|
"tune":"animation", |
|
} |
|
param.update(**self.param) |
|
|
|
stream = stream.output( |
|
self._out_file, **param |
|
) |
|
return stream.run() |
|
|
|
def _encode_hevc(self) -> tuple: |
|
stream: FilterNode = self.input |
|
if self.in_fps != self.out_fps: |
|
stream = self._interpolate(stream, self.out_fps) |
|
|
|
param = { |
|
"pix_fmt":"yuv420p", |
|
"vcodec":"libx264", |
|
"crf":21, |
|
"tune":"animation", |
|
} |
|
param.update(**self.param) |
|
|
|
stream = stream.output(self._out_file, **param) |
|
return stream.run() |
|
|