File size: 6,695 Bytes
d0ffe9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from enum import Enum
from pathlib import Path
from re import split
from typing import Annotated, Optional, Union

import ffmpeg
from ffmpeg.nodes import FilterNode, InputNode
from torch import Value


class VideoCodec(str, Enum):
    gif = "gif"
    vp9 = "vp9"
    webm = "webm"
    webp = "webp"
    h264 = "h264"
    hevc = "hevc"


def codec_extn(codec: VideoCodec):
    match codec:
        case VideoCodec.gif:
            return "gif"
        case VideoCodec.vp9:
            return "webm"
        case VideoCodec.webm:
            return "webm"
        case VideoCodec.webp:
            return "webp"
        case VideoCodec.h264:
            return "mp4"
        case VideoCodec.hevc:
            return "mp4"
        case _:
            raise ValueError(f"Unknown codec {codec}")


def clamp_gif_fps(fps: int):
    """Clamp FPS to a value that is supported by GIFs.

    GIF frame duration is measured in 1/100ths of a second, so we need to clamp the
    FPS to a value that 100 is a factor of.
    """
    # the sky is not the limit, sadly...
    if fps > 100:
        return 100

    # if 100/fps is an integer, we're good
    if 100 % fps == 0:
        return fps

    # but of course, it was never going to be that easy.
    match fps:
        case x if x > 50:
            # 50 is the highest FPS that 100 is a factor of.
            # people will ask for 60. they will get 50, and they will like it.
            return 50
        case x if x >= 30:
            return 33
        case x if x >= 24:
            return 25
        case x if x >= 20:
            return 20
        case x if x >= 15:
            # ffmpeg will pad a few frames to make this work
            return 16
        case x if x >= 12:
            return 12
        case x if x >= 10:
            # idk why anyone would request 11fps, but they're getting 10
            return 10
        case x if x >= 6:
            # also invalid but ffmpeg will pad it
            return 6
        case 4:
            return 4  # FINE, I GUESS
        case _:
            return 1  # I don't know why you would want this, but here you go


class FfmpegEncoder:
    def __init__(
        self,
        frames_dir: Path,
        out_file: Path,
        codec: VideoCodec,
        in_fps: int = 60,
        out_fps: int = 60,
        lossless: bool = False,
        param={},
    ):
        self.frames_dir = frames_dir
        self.out_file = out_file
        self.codec = codec
        self.in_fps = in_fps
        self.out_fps = out_fps
        self.lossless = lossless
        self.param = param

        self.input: Optional[InputNode] = None

    def encode(self) -> tuple:
        self.input: InputNode = ffmpeg.input(
            str(self.frames_dir.resolve().joinpath("%08d.png")), framerate=self.in_fps
        ).filter("fps", fps=self.in_fps)
        match self.codec:
            case VideoCodec.gif:
                return self._encode_gif()
            case VideoCodec.webm:
                return self._encode_webm()
            case VideoCodec.webp:
                return self._encode_webp()
            case VideoCodec.h264:
                return self._encode_h264()
            case VideoCodec.hevc:
                return self._encode_hevc()
            case _:
                raise ValueError(f"Unknown codec {self.codec}")

    @property
    def _out_file(self) -> Path:
        return str(self.out_file.resolve())

    @staticmethod
    def _interpolate(stream, out_fps: int) -> FilterNode:
        return stream.filter(
            "minterpolate", fps=out_fps, mi_mode="mci", mc_mode="aobmc", me_mode="bidir", vsbmc=1
        )

    def _encode_gif(self) -> tuple:
        stream: FilterNode = self.input

        # Output FPS must be divisible by 100 for GIFs, so we clamp it
        out_fps = clamp_gif_fps(self.out_fps)
        if self.in_fps != out_fps:
            stream = self._interpolate(stream, out_fps)

        # split into two streams for palettegen and paletteuse
        split_stream = stream.split()

        # generate the palette, then use it to encode the GIF
        palette = split_stream[0].filter("palettegen")
        stream = ffmpeg.filter([split_stream[1], palette], "paletteuse").output(
            self._out_file, vcodec="gif", loop=0
        )
        return stream.run()

    def _encode_webm(self) -> tuple:
        stream: FilterNode = self.input
        if self.in_fps != self.out_fps:
            stream = self._interpolate(stream, self.out_fps)
        param = {
            "pix_fmt":"yuv420p",
            "vcodec":"libvpx-vp9",
            "video_bitrate":0,
            "crf":24,
        }
        param.update(**self.param)
        stream = stream.output(
            self._out_file, **param
        )
        return stream.run()

    def _encode_webp(self) -> tuple:
        stream: FilterNode = self.input
        if self.in_fps != self.out_fps:
            stream = self._interpolate(stream, self.out_fps)

        if self.lossless:
            param = {
                "pix_fmt":"bgra",
                "vcodec":"libwebp_anim",
                "lossless":1,
                "compression_level":5,
                "qscale":75,
                "loop":0,
            }
            param.update(**self.param)
            stream = stream.output(
                self._out_file,
                **param
            )
        else:
            param = {
                "pix_fmt":"yuv420p",
                "vcodec":"libwebp_anim",
                "lossless":0,
                "compression_level":5,
                "qscale":90,
                "loop":0,
            }
            param.update(**self.param)
            stream = stream.output(
                self._out_file,
                **param
            )
        return stream.run()

    def _encode_h264(self) -> tuple:
        stream: FilterNode = self.input
        if self.in_fps != self.out_fps:
            stream = self._interpolate(stream, self.out_fps)

        param = {
            "pix_fmt":"yuv420p",
            "vcodec":"libx264",
            "crf":21,
            "tune":"animation",
        }
        param.update(**self.param)

        stream = stream.output(
            self._out_file, **param
        )
        return stream.run()

    def _encode_hevc(self) -> tuple:
        stream: FilterNode = self.input
        if self.in_fps != self.out_fps:
            stream = self._interpolate(stream, self.out_fps)

        param = {
            "pix_fmt":"yuv420p",
            "vcodec":"libx264",
            "crf":21,
            "tune":"animation",
        }
        param.update(**self.param)

        stream = stream.output(self._out_file, **param)
        return stream.run()