FunClip / videoclipper.py
shixian.shi
update
76934e1
raw
history blame
6.73 kB
import sys
import copy
import librosa
import logging
import argparse
import numpy as np
import soundfile as sf
import moviepy.editor as mpy
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from subtitle_utils import generate_srt, generate_srt_clip
from trans_utils import pre_proc, proc, write_state, load_state
from moviepy.editor import *
from moviepy.video.tools.subtitles import SubtitlesClip
class VideoClipper():
def __init__(self, asr_pipeline):
logging.warning("Initializing VideoClipper.")
self.asr_pipeline = asr_pipeline
def recog(self, audio_input, state=None):
if state is None:
state = {}
state['audio_input'] = audio_input
_, data = audio_input
data = data.astype(np.float64)
rec_result = self.asr_pipeline(audio_in=data)
state['recog_res_raw'] = rec_result['text_postprocessed']
state['timestamp'] = rec_result['time_stamp']
state['sentences'] = rec_result['sentences']
res_text = rec_result['text']
res_srt = generate_srt(rec_result['sentences'])
return res_text, res_srt, state
def clip(self, dest_text, start_ost, end_ost, state):
# get from state
audio_input = state['audio_input']
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
sr, data = audio_input
data = data.astype(np.float64)
all_ts = []
for _dest_text in dest_text.split('#'):
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append(_ts)
ts = all_ts
srt_index = 0
clip_srt = ""
if len(ts):
start, end = ts[0]
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
res_audio = data[start:end]
start_end_info = "from {} to {}".format(start/16000, end/16000)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index)
clip_srt += srt_clip
for _ts in ts[1:]: # multiple sentence input or multiple output matched
start, end = _ts
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
start_end_info += ", from {} to {}".format(start, end)
res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1)
clip_srt += srt_clip
if len(ts):
message = "{} periods found in the speech: ".format(len(ts)) + start_end_info
else:
message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text."
return (sr, res_audio), message, clip_srt
def video_recog(self, vedio_filename):
vedio_filename = vedio_filename
clip_video_file = vedio_filename[:-4] + '_clip.mp4'
video = mpy.VideoFileClip(vedio_filename)
audio_file = vedio_filename[:-3] + 'wav'
video.audio.write_audiofile(audio_file)
wav = librosa.load(audio_file, 16000)[0]
state = {
'vedio_filename': vedio_filename,
'clip_video_file': clip_video_file,
'video': video,
}
# res_text, res_srt = self.recog((16000, wav), state)
return self.recog((16000, wav), state)
def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False):
# get from state
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
video = state['video']
clip_video_file = state['clip_video_file']
vedio_filename = state['vedio_filename']
all_ts = []
srt_index = 0
for _dest_text in dest_text.split('#'):
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append(_ts)
ts = all_ts
clip_srt = ""
if len(ts):
start, end = ts[0][0] / 16000, ts[0][1] / 16000
start, end = start+start_ost/1000.0, end+end_ost/1000.0
video_clip = video.subclip(start, end)
clip_video_file = clip_video_file
start_end_info = "from {} to {}".format(start, end)
# message = "{} periods found in the audio: from {} to {}.".format(len(ts), start, end)
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(subs, generator)
video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))])
concate_clip = [video_clip]
for _ts in ts[1:]:
start, end = _ts[0] / 16000, _ts[1] / 16000
start, end = start+start_ost/1000.0, end+end_ost/1000.0
_video_clip = video.subclip(start, end)
clip_video_file = clip_video_file
start_end_info += ", from {} to {}".format(start, end)
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(subs, generator)
_video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))])
concate_clip.append(copy.copy(_video_clip))
message = "{} periods found in the audio: ".format(len(ts)) + start_end_info
logging.warning("Concating...")
if len(concate_clip) > 1:
video_clip = concatenate_videoclips(concate_clip)
video_clip.write_videofile(clip_video_file)
else:
clip_video_file = vedio_filename
message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text."
srt_clip = ''
return clip_video_file, message, clip_srt