|
import os |
|
import json |
|
import math |
|
import re |
|
import time |
|
import traceback |
|
from typing import Optional |
|
import xml.etree.ElementTree as ElementTree |
|
from html import unescape |
|
import yt_dlp |
|
|
|
debug = os.getenv("DEBUG") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getSubtitleOptions( |
|
lang: Optional[str] = None, |
|
proxy: Optional[str] = None, |
|
): |
|
ydl_opts = { |
|
"noplaylist": True, |
|
"writesubtitles": True, |
|
"writeautomaticsub": True, |
|
|
|
|
|
"skip_download": True, |
|
"socket_timeout": 10, |
|
"extractor_retries": 0, |
|
|
|
"extractor_args": { |
|
"youtube": { |
|
"player_skip": [ |
|
"configs", |
|
"initial", |
|
], |
|
"player_client": ["ios"], |
|
"skip": ["hls", "dash"], |
|
} |
|
}, |
|
} |
|
|
|
if lang: |
|
ydl_opts.update( |
|
{ |
|
"subtitleslangs": [ |
|
lang, |
|
"-live_chat", |
|
] |
|
} |
|
) |
|
if proxy: |
|
ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) |
|
|
|
|
|
return ydl_opts |
|
|
|
|
|
def getUrlFromSubtitleItem(item, lang="en", subType="vtt"): |
|
|
|
for subtitle in item[lang]: |
|
if lang != "live_chat" and subType == "xml": |
|
if debug: |
|
print( |
|
"subtitle source lang:{} url: {}".format(lang, subtitle.get("url")) |
|
) |
|
return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "") |
|
if subtitle.get("ext") == subType: |
|
if debug: |
|
print("subtitle lang:{} url: {}".format(lang, subtitle.get("url"))) |
|
return subtitle.get("url") |
|
|
|
return None |
|
|
|
|
|
def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False): |
|
item = info_dict.get("requested_subtitles") |
|
if not item: |
|
return None |
|
|
|
langs = item.keys() |
|
if lang in langs: |
|
item = {lang: [item[lang]]} if type(item[lang]) == dict else item |
|
url = getUrlFromSubtitleItem(item, lang, subType) |
|
if url: |
|
if debug: |
|
print("getRequestedSubtitlesUrl lang:{}".format(lang)) |
|
return url |
|
|
|
if not isLangKey: |
|
for l in langs: |
|
if l.startswith(lang): |
|
item = {l: [item[l]]} if type(item[l]) == dict else item |
|
url = getUrlFromSubtitleItem(item, l, subType) |
|
if url: |
|
if debug: |
|
print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
return None |
|
|
|
|
|
def getSubtitleLangUrl( |
|
info_dict, |
|
lang="en", |
|
subType="vtt", |
|
subTitleKeys=["subtitles", "automatic_captions"], |
|
isLangKey=False, |
|
): |
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
if lang in langs: |
|
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType) |
|
if url: |
|
if debug: |
|
print("getSubtitleLangUrl lang:{}".format(lang)) |
|
return url |
|
|
|
if not isLangKey: |
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
for l in langs: |
|
if l.startswith(lang): |
|
url = getUrlFromSubtitleItem( |
|
info_dict.get(subtitle_item), l, subType |
|
) |
|
if url: |
|
if debug: |
|
print("getSubtitleLangUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
|
|
return None |
|
|
|
|
|
def getSubtitleOtherUrl( |
|
info_dict, |
|
lang="en", |
|
subType="vtt", |
|
subTitleKeys=["subtitles", "automatic_captions"], |
|
): |
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
if len(langs) == 0: |
|
continue |
|
|
|
l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0]) |
|
if l is None: |
|
continue |
|
|
|
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) |
|
if url: |
|
if debug: |
|
print("getSubtitleOtherUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
|
|
return None |
|
|
|
|
|
async def fetchSubtitle( |
|
url: str, |
|
lang: Optional[str] = "en", |
|
subType: Optional[str] = "vtt", |
|
proxy: Optional[str] = None, |
|
) -> dict: |
|
return await fetchAnySubtitle(url, lang, subType, proxy) |
|
|
|
|
|
async def fetchAnySubtitle( |
|
url: str, |
|
lang: Optional[str] = "en", |
|
subType: Optional[str] = "vtt", |
|
skipEmpty: bool = True, |
|
proxy: Optional[str] = None, |
|
) -> dict: |
|
|
|
|
|
|
|
ydl_opts = getSubtitleOptions(lang, proxy) |
|
|
|
title = "unknow" |
|
duration = "" |
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
|
|
title = info_dict.get("title", "unknow") |
|
seconds = info_dict.get("duration") |
|
duration = str(seconds) if seconds else "" |
|
thumbnail = info_dict.get("thumbnail") |
|
if ".webp" in thumbnail: |
|
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( |
|
info_dict.get("id") |
|
) |
|
|
|
reqType = subType |
|
if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]: |
|
reqType = "xml" |
|
if debug: |
|
print( |
|
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( |
|
info_dict.get("subtitles").keys(), |
|
info_dict.get("automatic_captions").keys(), |
|
( |
|
info_dict.get("requested_subtitles").keys() |
|
if info_dict.get("requested_subtitles") |
|
else {} |
|
), |
|
) |
|
) |
|
|
|
subtitle_funcs = [ |
|
getRequestedSubtitlesUrl, |
|
getSubtitleLangUrl, |
|
getSubtitleOtherUrl, |
|
] |
|
for index in range(len(subtitle_funcs)): |
|
subtitle_url = subtitle_funcs[index](info_dict, lang, reqType) |
|
if subtitle_url: |
|
|
|
subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitle_url) |
|
print( |
|
"function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format( |
|
index, url, title, duration, len(subtitle or "") |
|
) |
|
) |
|
if subtitle is not None: |
|
return { |
|
"id": info_dict.get("id"), |
|
"url": url, |
|
"title": title, |
|
"thumbnail": thumbnail, |
|
"duration": duration, |
|
"subtitle": subtitle, |
|
"chapters": info_dict.get("chapters", None), |
|
} |
|
except Exception as e: |
|
print(e) |
|
traceback.print_exc() |
|
return {"error": str(e)} |
|
return {"title": title, "duration": duration, "error": "No subtitles"} |
|
|
|
|
|
def float_to_srt_time_format(d: float) -> str: |
|
"""Convert decimal durations into proper srt format. |
|
:rtype: str |
|
:returns: |
|
SubRip Subtitle (str) formatted time duration. |
|
float_to_srt_time_format(3.89) -> '00:00:03,890' |
|
""" |
|
fraction, whole = math.modf(d) |
|
time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole)) |
|
ms = f"{fraction:.3f}".replace("0.", "") |
|
return time_fmt + ms |
|
|
|
|
|
def is_spaces_only(variable): |
|
for char in variable: |
|
if not char.isspace(): |
|
return False |
|
return True |
|
|
|
|
|
def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str: |
|
"""Convert xml caption tracks to "SubRip Subtitle (srt)". |
|
:param str xml_captions: |
|
XML formatted caption tracks. |
|
""" |
|
segments = [] |
|
root = ElementTree.fromstring(xml_captions) |
|
for i, child in enumerate(list(root)): |
|
text = child.text or "" |
|
caption = unescape( |
|
text.replace("\n", " ").replace(" ", " "), |
|
) |
|
if skip_empty and len(caption) == 0 or is_spaces_only(caption): |
|
continue |
|
try: |
|
duration = float(child.attrib["dur"]) |
|
except KeyError: |
|
duration = 0.0 |
|
start = float(child.attrib["start"]) |
|
end = start + duration |
|
sequence_number = i + 1 |
|
line = "{seq}\n{start} --> {end}\n{text}\n".format( |
|
seq=sequence_number, |
|
start=float_to_srt_time_format(start), |
|
end=float_to_srt_time_format(end), |
|
text=caption, |
|
) |
|
segments.append(line) |
|
|
|
if skip_empty: |
|
|
|
return "\n".join(segments).strip() if len(segments) > 0 else None |
|
return "\n".join(segments).strip() |
|
|
|
|
|
def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str: |
|
"""Convert xml caption tracks to "SubRip Subtitle (srt)". |
|
:param str xml_captions: |
|
XML formatted caption tracks. |
|
""" |
|
segments = [] |
|
root = ElementTree.fromstring(xml_captions) |
|
for i, child in enumerate(list(root)): |
|
text = child.text or "" |
|
caption = unescape( |
|
text.replace("\n", " ").replace(" ", " "), |
|
) |
|
if skip_empty and (len(caption) == 0 or is_spaces_only(caption)): |
|
continue |
|
|
|
line = "{text}\n".format(text=caption) |
|
segments.append(line) |
|
|
|
if skip_empty: |
|
"\n".join(segments).strip() if len(segments) > 0 else None |
|
|
|
return "\n".join(segments).strip() |
|
|
|
|
|
async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json: |
|
ydl_opts = getSubtitleOptions(proxy) |
|
|
|
title = "unknow" |
|
duration = "" |
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
title = info_dict.get("title", "unknow") |
|
seconds = info_dict.get("duration") |
|
duration = str(seconds) if seconds else "" |
|
thumbnail = info_dict.get("thumbnail") |
|
if ".webp" in thumbnail: |
|
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( |
|
info_dict.get("id") |
|
) |
|
return { |
|
"id": info_dict.get("id"), |
|
"url": url, |
|
"title": title, |
|
"thumbnail": thumbnail, |
|
"duration": duration, |
|
"subtitles": info_dict.get("subtitles"), |
|
"automatic_captions": info_dict.get("automatic_captions"), |
|
} |
|
|
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
|
|
def fetchSubtitlebydlUrl(ydl, subType, dlUrl, skipEmpty=True): |
|
dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl) |
|
|
|
try: |
|
with ydl.urlopen(dlUrl) as resp: |
|
if subType == "srt": |
|
return xml_caption_to_srt(resp.read().decode(), skipEmpty) |
|
elif subType == "txt": |
|
return xml_caption_to_txt(resp.read().decode(), skipEmpty) |
|
else: |
|
return resp.read().decode() |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
|
|
def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey): |
|
subtitle_funcs = [ |
|
getRequestedSubtitlesUrl, |
|
getSubtitleLangUrl, |
|
] |
|
for index in range(len(subtitle_funcs)): |
|
subtitle_url = subtitle_funcs[index]( |
|
info_dict, lang, subType, isLangKey=isLangKey |
|
) |
|
print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url)) |
|
if subtitle_url: |
|
return subtitle_url |
|
|
|
|
|
async def fetchSubtitleByInfo( |
|
url: str, subType: str, dlInfo, proxy: Optional[str] = None |
|
): |
|
try: |
|
reqType = "xml" if subType in ["srt", "txt"] else subType |
|
ydl_opts = getSubtitleOptions(dlInfo.get("lang", None), proxy) |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
subtitle = None |
|
if "dlUrl" in dlInfo: |
|
subtitle = fetchSubtitlebydlUrl( |
|
ydl, subType, dlInfo.get("dlUrl"), False |
|
) |
|
if subtitle is not None: |
|
return subtitle |
|
|
|
info_dict = ydl.extract_info(url, download=False) |
|
if debug: |
|
print( |
|
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( |
|
info_dict.get("subtitles").keys(), |
|
info_dict.get("automatic_captions").keys(), |
|
( |
|
info_dict.get("requested_subtitles").keys() |
|
if info_dict.get("requested_subtitles") |
|
else {} |
|
), |
|
) |
|
) |
|
|
|
subtitleUrl = None |
|
if "langKey" in dlInfo: |
|
subtitleUrl = getSubtitleUrlByLang( |
|
info_dict, dlInfo.get("langKey"), reqType, True |
|
) |
|
if subtitleUrl is None: |
|
subtitleUrl = getSubtitleUrlByLang( |
|
info_dict, dlInfo.get("lang"), reqType, False |
|
) |
|
|
|
print("subtitleUrl: {}".format(subtitleUrl)) |
|
subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitleUrl, False) |
|
return subtitle |
|
except Exception as e: |
|
print(e) |
|
traceback.print_exc() |
|
return {"error": str(e)} |
|
|