|
import os |
|
import json |
|
import math |
|
import time |
|
import traceback |
|
from typing import Optional |
|
import xml.etree.ElementTree as ElementTree |
|
from html import unescape |
|
import yt_dlp |
|
|
|
debug = os.getenv("DEBUG") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getUrlFromSubtitleItem(item, lang="en", subType="vtt"): |
|
for subtitle in item[lang]: |
|
if lang != "live_chat" and subType == "xml": |
|
if debug: |
|
print( |
|
"subtitle source lang:{} url: {}".format(lang, subtitle.get("url")) |
|
) |
|
return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "") |
|
if subtitle.get("ext") == subType: |
|
if debug: |
|
print("subtitle lang:{} url: {}".format(lang, subtitle.get("url"))) |
|
return subtitle.get("url") |
|
|
|
return None |
|
|
|
|
|
def getRequestedSubtitlesUrl(info_dict, lang, subType): |
|
item = info_dict.get("requested_subtitles") |
|
if item: |
|
langs = item.keys() |
|
for l in langs: |
|
if l.startswith(lang): |
|
item = {l: [item[l]]} if type(item[l]) == dict else item |
|
url = getUrlFromSubtitleItem(item, l, subType) |
|
if url: |
|
if debug: |
|
print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
return None |
|
|
|
|
|
def getSubtitleLangUrl( |
|
info_dict, |
|
lang="en", |
|
subType="vtt", |
|
subTitleKeys=["subtitles", "automatic_captions"], |
|
): |
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
if lang in langs: |
|
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType) |
|
if url: |
|
if debug: |
|
print("getSubtitleLangUrl lang:{}".format(lang)) |
|
return url |
|
|
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
for l in langs: |
|
if l.startswith(lang): |
|
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) |
|
if url: |
|
if debug: |
|
print("getSubtitleLangUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
|
|
return None |
|
|
|
|
|
def getSubtitleOtherUrl( |
|
info_dict, |
|
lang="en", |
|
subType="vtt", |
|
subTitleKeys=["subtitles", "automatic_captions"], |
|
): |
|
for subtitle_item in subTitleKeys: |
|
langs = info_dict.get(subtitle_item).keys() |
|
if len(langs) == 0: |
|
continue |
|
|
|
l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0]) |
|
if l is None: |
|
continue |
|
|
|
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) |
|
if url: |
|
if debug: |
|
print("getSubtitleOtherUrl lang:{} url:{}".format(l, url)) |
|
return url |
|
|
|
return None |
|
|
|
|
|
async def fetchSubtitle( |
|
url: str, |
|
lang: Optional[str] = "en", |
|
subType: Optional[str] = "vtt", |
|
proxy: Optional[str] = None, |
|
) -> dict: |
|
return await fetchSubtitlebyType(url, lang, subType, proxy) |
|
|
|
|
|
async def fetchSubtitlebyType( |
|
url: str, |
|
lang: Optional[str] = "en", |
|
subType: Optional[str] = "vtt", |
|
proxy: Optional[str] = None, |
|
) -> dict: |
|
|
|
reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*" |
|
|
|
ydl_opts = { |
|
"noplaylist": True, |
|
"writesubtitles": True, |
|
"writeautomaticsub": True, |
|
|
|
|
|
"subtitleslangs": [reqLang], |
|
"skip_download": True, |
|
"socket_timeout": 10, |
|
"extractor_retries": 0, |
|
|
|
"extractor_args": { |
|
"youtube": { |
|
"player_skip": [ |
|
"configs", |
|
"initial", |
|
], |
|
"player_client": ["ios"], |
|
"skip": ["hls", "dash"], |
|
} |
|
}, |
|
} |
|
|
|
if proxy: |
|
ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) |
|
|
|
title = "unknow" |
|
duration = "" |
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
|
|
title = info_dict.get("title", "unknow") |
|
seconds = info_dict.get("duration") |
|
duration = str(seconds) if seconds else "" |
|
isSrt = False |
|
if info_dict.get("extractor") == "youtube" and subType == "srt": |
|
subType = "xml" |
|
isSrt = True |
|
if debug: |
|
print( |
|
"subtitles.keys(): {} automatic_captions: {}".format( |
|
info_dict.get("subtitles").keys(), |
|
info_dict.get("automatic_captions").keys(), |
|
) |
|
) |
|
|
|
subtitle_url = getRequestedSubtitlesUrl(info_dict, lang, subType) |
|
if not subtitle_url: |
|
subtitle_url = getSubtitleLangUrl(info_dict, lang, subType) |
|
if not subtitle_url: |
|
subtitle_url = getSubtitleOtherUrl(info_dict, lang, subType) |
|
|
|
if subtitle_url: |
|
|
|
with ydl.urlopen(subtitle_url) as response: |
|
subtitle = ( |
|
xml_caption_to_srt(response.read().decode()) |
|
if isSrt |
|
else response.read().decode() |
|
) |
|
print( |
|
"url:{}, title:{}, duration:{} len(subtitle): {}".format( |
|
url, title, duration, len(subtitle) |
|
) |
|
) |
|
return { |
|
"title": title, |
|
"duration": duration, |
|
"subtitle": subtitle, |
|
"chapters": info_dict.get("chapters", None), |
|
} |
|
except Exception as e: |
|
print(e) |
|
traceback.print_exc() |
|
return {"error": str(e)} |
|
return {"title": title, "duration": duration, "error": "No subtitles"} |
|
|
|
|
|
def float_to_srt_time_format(d: float) -> str: |
|
"""Convert decimal durations into proper srt format. |
|
:rtype: str |
|
:returns: |
|
SubRip Subtitle (str) formatted time duration. |
|
float_to_srt_time_format(3.89) -> '00:00:03,890' |
|
""" |
|
fraction, whole = math.modf(d) |
|
time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole)) |
|
ms = f"{fraction:.3f}".replace("0.", "") |
|
return time_fmt + ms |
|
|
|
|
|
def xml_caption_to_srt(xml_captions: str) -> str: |
|
"""Convert xml caption tracks to "SubRip Subtitle (srt)". |
|
:param str xml_captions: |
|
XML formatted caption tracks. |
|
""" |
|
segments = [] |
|
root = ElementTree.fromstring(xml_captions) |
|
for i, child in enumerate(list(root)): |
|
text = child.text or "" |
|
caption = unescape( |
|
text.replace("\n", " ").replace(" ", " "), |
|
) |
|
if len(caption) == 0: |
|
continue |
|
try: |
|
duration = float(child.attrib["dur"]) |
|
except KeyError: |
|
duration = 0.0 |
|
start = float(child.attrib["start"]) |
|
end = start + duration |
|
sequence_number = i + 1 |
|
line = "{seq}\n{start} --> {end}\n{text}\n".format( |
|
seq=sequence_number, |
|
start=float_to_srt_time_format(start), |
|
end=float_to_srt_time_format(end), |
|
text=caption, |
|
) |
|
segments.append(line) |
|
|
|
return "\n".join(segments).strip() if len(segments) > 0 else None |
|
|
|
|
|
async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json: |
|
ydl_opts = { |
|
"noplaylist": True, |
|
"writesubtitles": True, |
|
"writeautomaticsub": True, |
|
|
|
|
|
"skip_download": True, |
|
"socket_timeout": 10, |
|
"extractor_retries": 0, |
|
|
|
"extractor_args": { |
|
"youtube": { |
|
"player_skip": ["configs", "initial"], |
|
"player_client": ["ios"], |
|
"skip": ["hls", "dash"], |
|
} |
|
}, |
|
} |
|
if proxy: |
|
ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) |
|
|
|
title = "unknow" |
|
duration = "" |
|
try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(url, download=False) |
|
title = info_dict.get("title", "unknow") |
|
seconds = info_dict.get("duration") |
|
duration = str(seconds) if seconds else "" |
|
|
|
return { |
|
"title": title, |
|
"duration": duration, |
|
"subtitles": info_dict.get("subtitles"), |
|
"automatic_captions": info_dict.get("automatic_captions"), |
|
} |
|
|
|
except Exception as e: |
|
return {"error": str(e)} |
|
|