ytdlp_subtitle_dev / fetchYoutubeSubtitle.py
lanbogao's picture
1. Add api subtitle-dl to stream download subtitle.
5c48e61
raw
history blame
No virus
14.6 kB
import os
import json
import math
import re
import time
import traceback
from typing import Optional
import xml.etree.ElementTree as ElementTree
from html import unescape
import yt_dlp
debug = os.getenv("DEBUG")
# yt-dlp subtitle types: json3,srv1,srv2,srv3,ttml,vtt, xml(youtube url with out extargs)
# "subtitles": {
# "live_chat": [
# {
# "url": "https://www.youtube.com/watch?v=ANtM2bHRz04&bpctr=9999999999&has_verified=1",
# "ext": "json",
# "video_id": "ANtM2bHRz04",
# "protocol": "youtube_live_chat_replay"
# }
# ]
# }
def getSubtitleOptions(
lang: Optional[str] = None,
proxy: Optional[str] = None,
):
ydl_opts = {
"noplaylist": True,
"writesubtitles": True,
"writeautomaticsub": True,
# "listsubtitles": True,
# "subtitlesformat": subType, # mark due to default youtube no srt and xml format
"skip_download": True,
"socket_timeout": 10,
"extractor_retries": 0,
# "debug_printtraffic": True,
"extractor_args": {
"youtube": {
"player_skip": [
"configs",
"initial",
], # skip "webpage" will cause l2P5PgL1LfI missing some langs,
"player_client": ["ios"],
"skip": ["hls", "dash"], # don't skip "translated_subs"
}
},
}
if lang:
ydl_opts.update(
{
"subtitleslangs": [
lang,
"-live_chat",
]
}
) # filter live chat to requested_subtitles
if proxy:
ydl_opts.update({"proxy": proxy, "socket_timeout": 20})
# print(ydl_opts)
return ydl_opts
def getUrlFromSubtitleItem(item, lang="en", subType="vtt"):
# print("item: {}, lang: {}, subType: {}".format(item, lang, subType))
for subtitle in item[lang]:
if lang != "live_chat" and subType == "xml":
if debug:
print(
"subtitle source lang:{} url: {}".format(lang, subtitle.get("url"))
)
return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "")
if subtitle.get("ext") == subType:
if debug:
print("subtitle lang:{} url: {}".format(lang, subtitle.get("url")))
return subtitle.get("url")
return None
def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False):
item = info_dict.get("requested_subtitles")
if not item:
return None
langs = item.keys()
if lang in langs:
item = {lang: [item[lang]]} if type(item[lang]) == dict else item
url = getUrlFromSubtitleItem(item, lang, subType)
if url:
if debug:
print("getRequestedSubtitlesUrl lang:{}".format(lang))
return url
if not isLangKey:
for l in langs:
if l.startswith(lang):
item = {l: [item[l]]} if type(item[l]) == dict else item
url = getUrlFromSubtitleItem(item, l, subType)
if url:
if debug:
print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url))
return url
return None
def getSubtitleLangUrl(
info_dict,
lang="en",
subType="vtt",
subTitleKeys=["subtitles", "automatic_captions"],
isLangKey=False,
):
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
if lang in langs:
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType)
if url:
if debug:
print("getSubtitleLangUrl lang:{}".format(lang))
return url
if not isLangKey:
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
for l in langs:
if l.startswith(lang):
url = getUrlFromSubtitleItem(
info_dict.get(subtitle_item), l, subType
)
if url:
if debug:
print("getSubtitleLangUrl lang:{} url:{}".format(l, url))
return url
return None
def getSubtitleOtherUrl(
info_dict,
lang="en",
subType="vtt",
subTitleKeys=["subtitles", "automatic_captions"],
):
for subtitle_item in subTitleKeys:
langs = info_dict.get(subtitle_item).keys()
if len(langs) == 0:
continue
l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0])
if l is None:
continue
url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType)
if url:
if debug:
print("getSubtitleOtherUrl lang:{} url:{}".format(l, url))
return url
return None
async def fetchSubtitle(
url: str,
lang: Optional[str] = "en",
subType: Optional[str] = "vtt",
proxy: Optional[str] = None,
) -> dict:
return await fetchAnySubtitle(url, lang, subType, proxy)
async def fetchAnySubtitle(
url: str,
lang: Optional[str] = "en",
subType: Optional[str] = "vtt",
skipEmpty: bool = True,
proxy: Optional[str] = None,
) -> dict:
# lang-code or lang.* .* is regex
# reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*"
ydl_opts = getSubtitleOptions(lang, proxy)
title = "unknow"
duration = ""
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
# print(json.dumps(info_dict))
title = info_dict.get("title", "unknow")
seconds = info_dict.get("duration")
duration = str(seconds) if seconds else ""
thumbnail = info_dict.get("thumbnail")
if ".webp" in thumbnail:
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
info_dict.get("id")
)
reqType = subType
if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]:
reqType = "xml"
if debug:
print(
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
info_dict.get("subtitles").keys(),
info_dict.get("automatic_captions").keys(),
(
info_dict.get("requested_subtitles").keys()
if info_dict.get("requested_subtitles")
else {}
),
)
)
subtitle_funcs = [
getRequestedSubtitlesUrl,
getSubtitleLangUrl,
getSubtitleOtherUrl,
]
for index in range(len(subtitle_funcs)):
subtitle_url = subtitle_funcs[index](info_dict, lang, reqType)
if subtitle_url:
# print("subtitle_url: {}".format(subtitle_url))
subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitle_url)
print(
"function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format(
index, url, title, duration, len(subtitle or "")
)
)
if subtitle is not None:
return {
"id": info_dict.get("id"),
"url": url,
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"subtitle": subtitle,
"chapters": info_dict.get("chapters", None),
}
except Exception as e:
print(e)
traceback.print_exc()
return {"error": str(e)}
return {"title": title, "duration": duration, "error": "No subtitles"}
def float_to_srt_time_format(d: float) -> str:
"""Convert decimal durations into proper srt format.
:rtype: str
:returns:
SubRip Subtitle (str) formatted time duration.
float_to_srt_time_format(3.89) -> '00:00:03,890'
"""
fraction, whole = math.modf(d)
time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole))
ms = f"{fraction:.3f}".replace("0.", "")
return time_fmt + ms
def is_spaces_only(variable):
for char in variable:
if not char.isspace():
return False
return True
def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str:
"""Convert xml caption tracks to "SubRip Subtitle (srt)".
:param str xml_captions:
XML formatted caption tracks.
"""
segments = []
root = ElementTree.fromstring(xml_captions)
for i, child in enumerate(list(root)):
text = child.text or ""
caption = unescape(
text.replace("\n", " ").replace(" ", " "),
)
if skip_empty and len(caption) == 0 or is_spaces_only(caption):
continue
try:
duration = float(child.attrib["dur"])
except KeyError:
duration = 0.0
start = float(child.attrib["start"])
end = start + duration
sequence_number = i + 1 # convert from 0-indexed to 1.
line = "{seq}\n{start} --> {end}\n{text}\n".format(
seq=sequence_number,
start=float_to_srt_time_format(start),
end=float_to_srt_time_format(end),
text=caption,
)
segments.append(line)
if skip_empty:
# return None if no text in xml
return "\n".join(segments).strip() if len(segments) > 0 else None
return "\n".join(segments).strip()
def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str:
"""Convert xml caption tracks to "SubRip Subtitle (srt)".
:param str xml_captions:
XML formatted caption tracks.
"""
segments = []
root = ElementTree.fromstring(xml_captions)
for i, child in enumerate(list(root)):
text = child.text or ""
caption = unescape(
text.replace("\n", " ").replace(" ", " "),
)
if skip_empty and (len(caption) == 0 or is_spaces_only(caption)):
continue
line = "{text}\n".format(text=caption)
segments.append(line)
if skip_empty:
"\n".join(segments).strip() if len(segments) > 0 else None
return "\n".join(segments).strip()
async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json:
ydl_opts = getSubtitleOptions(proxy)
title = "unknow"
duration = ""
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=False)
title = info_dict.get("title", "unknow")
seconds = info_dict.get("duration")
duration = str(seconds) if seconds else ""
thumbnail = info_dict.get("thumbnail")
if ".webp" in thumbnail:
thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(
info_dict.get("id")
)
return {
"id": info_dict.get("id"),
"url": url,
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"subtitles": info_dict.get("subtitles"),
"automatic_captions": info_dict.get("automatic_captions"),
}
except Exception as e:
return {"error": str(e)}
def fetchSubtitlebydlUrl(ydl, subType, dlUrl, skipEmpty=True):
dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl)
try:
with ydl.urlopen(dlUrl) as resp:
if subType == "srt":
return xml_caption_to_srt(resp.read().decode(), skipEmpty)
elif subType == "txt":
return xml_caption_to_txt(resp.read().decode(), skipEmpty)
else:
return resp.read().decode()
except Exception as e:
print(e)
return None
def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey):
subtitle_funcs = [
getRequestedSubtitlesUrl,
getSubtitleLangUrl,
]
for index in range(len(subtitle_funcs)):
subtitle_url = subtitle_funcs[index](
info_dict, lang, subType, isLangKey=isLangKey
)
print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url))
if subtitle_url:
return subtitle_url
async def fetchSubtitleByInfo(
url: str, subType: str, dlInfo, proxy: Optional[str] = None
):
try:
reqType = "xml" if subType in ["srt", "txt"] else subType
ydl_opts = getSubtitleOptions(dlInfo.get("lang", None), proxy)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
subtitle = None
if "dlUrl" in dlInfo:
subtitle = fetchSubtitlebydlUrl(
ydl, subType, dlInfo.get("dlUrl"), False
)
if subtitle is not None:
return subtitle
info_dict = ydl.extract_info(url, download=False)
if debug:
print(
"subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format(
info_dict.get("subtitles").keys(),
info_dict.get("automatic_captions").keys(),
(
info_dict.get("requested_subtitles").keys()
if info_dict.get("requested_subtitles")
else {}
),
)
)
subtitleUrl = None
if "langKey" in dlInfo:
subtitleUrl = getSubtitleUrlByLang(
info_dict, dlInfo.get("langKey"), reqType, True
)
if subtitleUrl is None:
subtitleUrl = getSubtitleUrlByLang(
info_dict, dlInfo.get("lang"), reqType, False
)
print("subtitleUrl: {}".format(subtitleUrl))
subtitle = fetchSubtitlebydlUrl(ydl, subType, subtitleUrl, False)
return subtitle
except Exception as e:
print(e)
traceback.print_exc()
return {"error": str(e)}