import sys import copy import librosa import logging import argparse import numpy as np import soundfile as sf import moviepy.editor as mpy from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks from subtitle_utils import generate_srt, generate_srt_clip from trans_utils import pre_proc, proc, write_state, load_state from moviepy.editor import * from moviepy.video.tools.subtitles import SubtitlesClip class VideoClipper(): def __init__(self, asr_pipeline): logging.warning("Initializing VideoClipper.") self.asr_pipeline = asr_pipeline def recog(self, audio_input, state=None): if state is None: state = {} state['audio_input'] = audio_input _, data = audio_input data = data.astype(np.float64) rec_result = self.asr_pipeline(audio_in=data) state['recog_res_raw'] = rec_result['text_postprocessed'] state['timestamp'] = rec_result['time_stamp'] state['sentences'] = rec_result['sentences'] res_text = rec_result['text'] res_srt = generate_srt(rec_result['sentences']) return res_text, res_srt, state def clip(self, dest_text, start_ost, end_ost, state): # get from state audio_input = state['audio_input'] recog_res_raw = state['recog_res_raw'] timestamp = state['timestamp'] sentences = state['sentences'] sr, data = audio_input data = data.astype(np.float64) all_ts = [] for _dest_text in dest_text.split('#'): _dest_text = pre_proc(_dest_text) ts = proc(recog_res_raw, timestamp, _dest_text) for _ts in ts: all_ts.append(_ts) ts = all_ts srt_index = 0 clip_srt = "" if len(ts): start, end = ts[0] start = min(max(0, start+start_ost*16), len(data)) end = min(max(0, end+end_ost*16), len(data)) res_audio = data[start:end] start_end_info = "from {} to {}".format(start/16000, end/16000) srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index) clip_srt += srt_clip for _ts in ts[1:]: # multiple sentence input or multiple output matched start, end = _ts start = min(max(0, start+start_ost*16), len(data)) end = min(max(0, end+end_ost*16), len(data)) start_end_info += ", from {} to {}".format(start, end) res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1) srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1) clip_srt += srt_clip if len(ts): message = "{} periods found in the speech: ".format(len(ts)) + start_end_info else: message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text." return (sr, res_audio), message, clip_srt def video_recog(self, vedio_filename): vedio_filename = vedio_filename clip_video_file = vedio_filename[:-4] + '_clip.mp4' video = mpy.VideoFileClip(vedio_filename) audio_file = vedio_filename[:-3] + 'wav' video.audio.write_audiofile(audio_file) wav = librosa.load(audio_file, 16000)[0] state = { 'vedio_filename': vedio_filename, 'clip_video_file': clip_video_file, 'video': video, } # res_text, res_srt = self.recog((16000, wav), state) return self.recog((16000, wav), state) def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False): # get from state recog_res_raw = state['recog_res_raw'] timestamp = state['timestamp'] sentences = state['sentences'] video = state['video'] clip_video_file = state['clip_video_file'] vedio_filename = state['vedio_filename'] all_ts = [] srt_index = 0 for _dest_text in dest_text.split('#'): _dest_text = pre_proc(_dest_text) ts = proc(recog_res_raw, timestamp, _dest_text) for _ts in ts: all_ts.append(_ts) ts = all_ts clip_srt = "" if len(ts): start, end = ts[0][0] / 16000, ts[0][1] / 16000 start, end = start+start_ost/1000.0, end+end_ost/1000.0 video_clip = video.subclip(start, end) clip_video_file = clip_video_file start_end_info = "from {} to {}".format(start, end) # message = "{} periods found in the audio: from {} to {}.".format(len(ts), start, end) srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index) clip_srt += srt_clip if add_sub: generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) subtitles = SubtitlesClip(subs, generator) video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))]) concate_clip = [video_clip] for _ts in ts[1:]: start, end = _ts[0] / 16000, _ts[1] / 16000 start, end = start+start_ost/1000.0, end+end_ost/1000.0 _video_clip = video.subclip(start, end) clip_video_file = clip_video_file start_end_info += ", from {} to {}".format(start, end) srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1) clip_srt += srt_clip if add_sub: generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) subtitles = SubtitlesClip(subs, generator) _video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))]) concate_clip.append(copy.copy(_video_clip)) message = "{} periods found in the audio: ".format(len(ts)) + start_end_info logging.warning("Concating...") if len(concate_clip) > 1: video_clip = concatenate_videoclips(concate_clip) video_clip.write_videofile(clip_video_file) else: clip_video_file = vedio_filename message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text." srt_clip = '' return clip_video_file, message, clip_srt