import json from typing import Any, Dict, List import tensorflow as tf from tensorflow import keras import base64 import io import os import numpy as np from PIL import Image import youtube_transcript_api2 import json import re import requests from transformers import ( AutoModelForSequenceClassification, AutoTokenizer, TextClassificationPipeline, ) from typing import Any, Dict, List CATEGORIES = [None, 'SPONSOR', 'SELFPROMO', 'INTERACTION'] PROFANITY_RAW = '[ __ ]' # How YouTube transcribes profanity PROFANITY_CONVERTED = '*****' # Safer version for tokenizing NUM_DECIMALS = 3 # https://www.fincher.org/Utilities/CountryLanguageList.shtml # https://lingohub.com/developers/supported-locales/language-designators-with-regions LANGUAGE_PREFERENCE_LIST = ['en-GB', 'en-US', 'en-CA', 'en-AU', 'en-NZ', 'en-ZA', 'en-IE', 'en-IN', 'en-JM', 'en-BZ', 'en-TT', 'en-PH', 'en-ZW', 'en'] def parse_transcript_json(json_data, granularity): assert json_data['wireMagic'] == 'pb3' assert granularity in ('word', 'chunk') # TODO remove bracketed words? # (kiss smacks) # (upbeat music) # [text goes here] # Some manual transcripts aren't that well formatted... but do have punctuation # https://www.youtube.com/watch?v=LR9FtWVjk2c parsed_transcript = [] events = json_data['events'] for event_index, event in enumerate(events): segments = event.get('segs') if not segments: continue # This value is known (when phrase appears on screen) start_ms = event['tStartMs'] total_characters = 0 new_segments = [] for seg in segments: # Replace \n, \t, etc. with space text = ' '.join(seg['utf8'].split()) # Remove zero-width spaces and strip trailing and leading whitespace text = text.replace('\u200b', '').replace('\u200c', '').replace( '\u200d', '').replace('\ufeff', '').strip() # Alternatively, # text = text.encode('ascii', 'ignore').decode() # Needed for auto-generated transcripts text = text.replace(PROFANITY_RAW, PROFANITY_CONVERTED) if not text: continue offset_ms = seg.get('tOffsetMs', 0) new_segments.append({ 'text': text, 'start': round((start_ms + offset_ms)/1000, NUM_DECIMALS) }) total_characters += len(text) if not new_segments: continue if event_index < len(events) - 1: next_start_ms = events[event_index + 1]['tStartMs'] total_event_duration_ms = min( event.get('dDurationMs', float('inf')), next_start_ms - start_ms) else: total_event_duration_ms = event.get('dDurationMs', 0) # Ensure duration is non-negative total_event_duration_ms = max(total_event_duration_ms, 0) avg_seconds_per_character = ( total_event_duration_ms/total_characters)/1000 num_char_count = 0 for seg_index, seg in enumerate(new_segments): num_char_count += len(seg['text']) # Estimate segment end seg_end = seg['start'] + \ (num_char_count * avg_seconds_per_character) if seg_index < len(new_segments) - 1: # Do not allow longer than next seg_end = min(seg_end, new_segments[seg_index+1]['start']) seg['end'] = round(seg_end, NUM_DECIMALS) parsed_transcript.append(seg) final_parsed_transcript = [] for i in range(len(parsed_transcript)): word_level = granularity == 'word' if word_level: split_text = parsed_transcript[i]['text'].split() elif granularity == 'chunk': # Split on space after punctuation split_text = re.split( r'(?<=[.!?,-;])\s+', parsed_transcript[i]['text']) if len(split_text) == 1: split_on_whitespace = parsed_transcript[i]['text'].split() if len(split_on_whitespace) >= 8: # Too many words # Rather split on whitespace instead of punctuation split_text = split_on_whitespace else: word_level = True else: raise ValueError('Unknown granularity') segment_end = parsed_transcript[i]['end'] if i < len(parsed_transcript) - 1: segment_end = min(segment_end, parsed_transcript[i+1]['start']) segment_duration = segment_end - parsed_transcript[i]['start'] num_chars_in_text = sum(map(len, split_text)) num_char_count = 0 current_offset = 0 for s in split_text: num_char_count += len(s) next_offset = (num_char_count/num_chars_in_text) * segment_duration word_start = round( parsed_transcript[i]['start'] + current_offset, NUM_DECIMALS) word_end = round( parsed_transcript[i]['start'] + next_offset, NUM_DECIMALS) # Make the reasonable assumption that min wps is 1.5 final_parsed_transcript.append({ 'text': s, 'start': word_start, 'end': min(word_end, word_start + 1.5) if word_level else word_end }) current_offset = next_offset return final_parsed_transcript def list_transcripts(video_id): try: return youtube_transcript_api2.YouTubeTranscriptApi.list_transcripts(video_id) except json.decoder.JSONDecodeError: return None WORDS_TO_REMOVE = [ '[Music]' '[Applause]' '[Laughter]' ] def get_words(video_id, transcript_type='auto', fallback='manual', filter_words_to_remove=True, granularity='word'): """Get parsed video transcript with caching system returns None if not processed yet and process is False """ raw_transcript_json = None try: transcript_list = list_transcripts(video_id) if transcript_list is not None: if transcript_type == 'manual': ts = transcript_list.find_manually_created_transcript( LANGUAGE_PREFERENCE_LIST) else: ts = transcript_list.find_generated_transcript( LANGUAGE_PREFERENCE_LIST) raw_transcript = ts._http_client.get( f'{ts._url}&fmt=json3').content if raw_transcript: raw_transcript_json = json.loads(raw_transcript) except (youtube_transcript_api2.TooManyRequests, youtube_transcript_api2.YouTubeRequestFailed): raise # Cannot recover from these errors and do not mark as empty transcript except requests.exceptions.RequestException: # Can recover return get_words(video_id, transcript_type, fallback, granularity) except youtube_transcript_api2.CouldNotRetrieveTranscript: # Retrying won't solve pass # Mark as empty transcript except json.decoder.JSONDecodeError: return get_words(video_id, transcript_type, fallback, granularity) if not raw_transcript_json and fallback is not None: return get_words(video_id, transcript_type=fallback, fallback=None, granularity=granularity) if raw_transcript_json: processed_transcript = parse_transcript_json( raw_transcript_json, granularity) if filter_words_to_remove: processed_transcript = list( filter(lambda x: x['text'] not in WORDS_TO_REMOVE, processed_transcript)) else: processed_transcript = raw_transcript_json # Either None or [] return processed_transcript def word_start(word): return word['start'] def word_end(word): return word.get('end', word['start']) def extract_segment(words, start, end, map_function=None): """Extracts all words with time in [start, end]""" a = max(binary_search_below(words, 0, len(words), start), 0) b = min(binary_search_above(words, -1, len(words) - 1, end) + 1, len(words)) to_transform = map_function is not None and callable(map_function) return [ map_function(words[i]) if to_transform else words[i] for i in range(a, b) ] def avg(*items): return sum(items)/len(items) def binary_search_below(transcript, start_index, end_index, time): if start_index >= end_index: return end_index middle_index = (start_index + end_index) // 2 middle = transcript[middle_index] middle_time = avg(word_start(middle), word_end(middle)) if time <= middle_time: return binary_search_below(transcript, start_index, middle_index, time) else: return binary_search_below(transcript, middle_index + 1, end_index, time) def binary_search_above(transcript, start_index, end_index, time): if start_index >= end_index: return end_index middle_index = (start_index + end_index + 1) // 2 middle = transcript[middle_index] middle_time = avg(word_start(middle), word_end(middle)) if time >= middle_time: return binary_search_above(transcript, middle_index, end_index, time) else: return binary_search_above(transcript, start_index, middle_index - 1, time) class PreTrainedPipeline(): def __init__(self, path: str): # load the model self.model = keras.models.load_model(os.path.join(path, "tf_model.h5")) path2 = os.path.join(path, "model") self.model2 = AutoModelForSequenceClassification.from_pretrained(path2) self.tokenizer2 = AutoTokenizer.from_pretrained(path2) self.pipeline2 = SponsorBlockClassificationPipeline( model=self.model2, tokenizer=self.tokenizer2) def __call__(self, inputs: str)-> List[Dict[str, Any]]: data = json.loads(inputs) return self.pipeline2(data) class SponsorBlockClassificationPipeline(TextClassificationPipeline): def __init__(self, model, tokenizer): super().__init__(model=model, tokenizer=tokenizer, return_all_scores=True) def preprocess(self, video, **tokenizer_kwargs): words = get_words(video['video_id']) segment_words = extract_segment(words, video['start'], video['end']) text = ' '.join(x['text'] for x in segment_words) model_inputs = self.tokenizer( text, return_tensors=self.framework, **tokenizer_kwargs) return {'video': video, 'model_inputs': model_inputs} def _forward(self, data): model_outputs = self.model(**data['model_inputs']) return {'video': data['video'], 'model_outputs': model_outputs} def postprocess(self, data, function_to_apply=None, return_all_scores=False): model_outputs = data['model_outputs'] results = super().postprocess(model_outputs, function_to_apply, return_all_scores) for result in results: result['label_text'] = CATEGORIES[result['label']] return results # {**data['video'], 'result': results}