Persian_ASR / autosub /__init__.py
imansarraf's picture
Upload 10 files
10bc845 verified
raw
history blame
15.4 kB
"""
Defines autosub's main functionality.
"""
#!/usr/bin/env python
from __future__ import absolute_import, print_function, unicode_literals
import argparse
import audioop
import math
import multiprocessing
import os
from json import JSONDecodeError
import subprocess
import sys
import tempfile
import wave
import json
import requests
try:
from json.decoder import JSONDecodeError
except ImportError:
JSONDecodeError = ValueError
from googleapiclient.discovery import build
from progressbar import ProgressBar, Percentage, Bar, ETA
from autosub.constants import (
LANGUAGE_CODES, GOOGLE_SPEECH_API_KEY, GOOGLE_SPEECH_API_URL,
)
from autosub.formatters import FORMATTERS
from pathlib import PurePath
DEFAULT_SUBTITLE_FORMAT = 'srt'
DEFAULT_CONCURRENCY = 10
DEFAULT_SRC_LANGUAGE = 'en'
DEFAULT_DST_LANGUAGE = 'en'
def percentile(arr, percent):
"""
Calculate the given percentile of arr.
"""
arr = sorted(arr)
index = (len(arr) - 1) * percent
floor = math.floor(index)
ceil = math.ceil(index)
if floor == ceil:
return arr[int(index)]
low_value = arr[int(floor)] * (ceil - index)
high_value = arr[int(ceil)] * (index - floor)
return low_value + high_value
class FLACConverter(object): # pylint: disable=too-few-public-methods
"""
Class for converting a region of an input audio or video file into a FLAC audio file
"""
def __init__(self, source_path, include_before=0.25, include_after=0.25):
self.source_path = source_path
self.include_before = include_before
self.include_after = include_after
def __call__(self, region):
try:
start, end = region
start = max(0, start - self.include_before)
end += self.include_after
#delete=False necessary for running on Windows
temp = tempfile.NamedTemporaryFile(suffix='.flac', delete=False)
program_ffmpeg = which("ffmpeg")
command = [str(program_ffmpeg), "-ss", str(start), "-t", str(end - start),
"-y", "-i", self.source_path,
"-loglevel", "error", temp.name]
use_shell = True if os.name == "nt" else False
subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)
read_data = temp.read()
temp.close()
os.unlink(temp.name)
return read_data
except KeyboardInterrupt:
return None
class SpeechRecognizer(object): # pylint: disable=too-few-public-methods
"""
Class for performing speech-to-text for an input FLAC file.
"""
def __init__(self, language="en", rate=44100, retries=3, api_key=GOOGLE_SPEECH_API_KEY, proxies=None):
self.language = language
self.rate = rate
self.api_key = api_key
self.retries = retries
self.proxies = proxies
def __call__(self, data):
try:
for _ in range(self.retries):
url = GOOGLE_SPEECH_API_URL.format(lang=self.language, key=self.api_key)
headers = {"Content-Type": "audio/x-flac; rate=%d" % self.rate}
try:
if self.proxies:
resp = requests.post(url, data=data, headers=headers, proxies=self.proxies)
else:
resp = requests.post(url, data=data, headers=headers)
except requests.exceptions.ConnectionError:
continue
for line in resp.content.decode('utf-8').split("\n"):
try:
line = json.loads(line)
line = line['result'][0]['alternative'][0]['transcript']
return line[:1].upper() + line[1:]
except IndexError:
# no result
continue
except JSONDecodeError:
continue
except KeyboardInterrupt:
return None
class Translator(object): # pylint: disable=too-few-public-methods
"""
Class for translating a sentence from a one language to another.
"""
def __init__(self, language, api_key, src, dst):
self.language = language
self.api_key = api_key
self.service = build('translate', 'v2',
developerKey=self.api_key)
self.src = src
self.dst = dst
def __call__(self, sentence):
try:
if not sentence:
return None
result = self.service.translations().list( # pylint: disable=no-member
source=self.src,
target=self.dst,
q=[sentence]
).execute()
if 'translations' in result and result['translations'] and \
'translatedText' in result['translations'][0]:
return result['translations'][0]['translatedText']
return None
except KeyboardInterrupt:
return None
def which(program):
"""
Return the path for a given executable.
"""
def is_exe(file_path):
"""
Checks whether a file is executable.
"""
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
#necessary to run on Windows
if os.name == "nt":
program += ".exe"
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
local_program_path = PurePath(__file__).parent.parent.joinpath(program)
str_local_program_path = str(local_program_path)
if is_exe(str_local_program_path):
return str_local_program_path
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def extract_audio(filename, channels=1, rate=16000):
"""
Extract audio from an input file to a temporary WAV file.
"""
temp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
if not os.path.isfile(filename):
print("The given file does not exist: {}".format(filename))
raise Exception("Invalid filepath: {}".format(filename))
program_ffmpeg = which("ffmpeg")
if not program_ffmpeg:
print("ffmpeg: Executable not found on machine.")
raise Exception("Dependency not found: ffmpeg")
command = [str(program_ffmpeg), "-y", "-i", filename,
"-ac", str(channels), "-ar", str(rate),
"-loglevel", "error", temp.name]
use_shell = True if os.name == "nt" else False
subprocess.check_output(command, stdin=open(os.devnull), shell=use_shell)
return temp.name, rate
def find_speech_regions(filename, frame_width=4096, min_region_size=0.5, max_region_size=6): # pylint: disable=too-many-locals
"""
Perform voice activity detection on a given audio file.
"""
reader = wave.open(filename)
sample_width = reader.getsampwidth()
rate = reader.getframerate()
n_channels = reader.getnchannels()
chunk_duration = float(frame_width) / rate
n_chunks = int(math.ceil(reader.getnframes()*1.0 / frame_width))
energies = []
for _ in range(n_chunks):
chunk = reader.readframes(frame_width)
energies.append(audioop.rms(chunk, sample_width * n_channels))
threshold = percentile(energies, 0.2)
elapsed_time = 0
regions = []
region_start = None
for energy in energies:
is_silence = energy <= threshold
max_exceeded = region_start and elapsed_time - region_start >= max_region_size
if (max_exceeded or is_silence) and region_start:
if elapsed_time - region_start >= min_region_size:
regions.append((region_start, elapsed_time))
region_start = None
elif (not region_start) and (not is_silence):
region_start = elapsed_time
elapsed_time += chunk_duration
return regions
def generate_subtitles( # pylint: disable=too-many-locals,too-many-arguments
source_path,
output=None,
concurrency=DEFAULT_CONCURRENCY,
src_language=DEFAULT_SRC_LANGUAGE,
dst_language=DEFAULT_DST_LANGUAGE,
subtitle_file_format=DEFAULT_SUBTITLE_FORMAT,
api_key=None,
proxies=None
):
"""
Given an input audio/video file, generate subtitles in the specified language and format.
"""
if os.name != "nt" and "Darwin" in os.uname():
#the default unix fork method does not work on Mac OS
#need to use forkserver
if 'forkserver' != multiprocessing.get_start_method(allow_none=True):
multiprocessing.set_start_method('forkserver')
audio_filename, audio_rate = extract_audio(source_path)
regions = find_speech_regions(audio_filename)
pool = multiprocessing.Pool(concurrency)
converter = FLACConverter(source_path=audio_filename)
recognizer = SpeechRecognizer(language=src_language, rate=audio_rate,
api_key=GOOGLE_SPEECH_API_KEY, proxies=proxies)
transcripts = []
if regions:
try:
widgets = ["Converting speech regions to FLAC files: ", Percentage(), ' ', Bar(), ' ',
ETA()]
pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
extracted_regions = []
for i, extracted_region in enumerate(pool.imap(converter, regions)):
extracted_regions.append(extracted_region)
pbar.update(i)
pbar.finish()
widgets = ["Performing speech recognition: ", Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
for i, transcript in enumerate(pool.imap(recognizer, extracted_regions)):
transcripts.append(transcript)
pbar.update(i)
pbar.finish()
if src_language.split("-")[0] != dst_language.split("-")[0]:
if api_key:
google_translate_api_key = api_key
translator = Translator(dst_language, google_translate_api_key,
dst=dst_language,
src=src_language)
prompt = "Translating from {0} to {1}: ".format(src_language, dst_language)
widgets = [prompt, Percentage(), ' ', Bar(), ' ', ETA()]
pbar = ProgressBar(widgets=widgets, maxval=len(regions)).start()
translated_transcripts = []
for i, transcript in enumerate(pool.imap(translator, transcripts)):
translated_transcripts.append(transcript)
pbar.update(i)
pbar.finish()
transcripts = translated_transcripts
else:
print(
"Error: Subtitle translation requires specified Google Translate API key. "
"See --help for further information."
)
return 1
except KeyboardInterrupt:
pbar.finish()
pool.terminate()
pool.join()
print("Cancelling transcription")
raise
timed_subtitles = [(r, t) for r, t in zip(regions, transcripts) if t]
formatter = FORMATTERS.get(subtitle_file_format)
formatted_subtitles = formatter(timed_subtitles)
dest = output
if not dest:
base = os.path.splitext(source_path)[0]
dest = "{base}.{format}".format(base=base, format=subtitle_file_format)
with open(dest, 'wb') as output_file:
output_file.write(formatted_subtitles.encode("utf-8"))
os.remove(audio_filename)
return dest
def validate(args):
"""
Check that the CLI arguments passed to autosub are valid.
"""
if args.format not in FORMATTERS:
print(
"Subtitle format not supported. "
"Run with --list-formats to see all supported formats."
)
return False
if args.src_language not in LANGUAGE_CODES.keys():
print(
"Source language not supported. "
"Run with --list-languages to see all supported languages."
)
return False
if args.dst_language not in LANGUAGE_CODES.keys():
print(
"Destination language not supported. "
"Run with --list-languages to see all supported languages."
)
return False
if not args.source_path:
print("Error: You need to specify a source path.")
return False
return True
def main():
"""
Run autosub as a command-line program.
"""
parser = argparse.ArgumentParser()
parser.add_argument('source_path', help="Path to the video or audio file to subtitle",
nargs='?')
parser.add_argument('-C', '--concurrency', help="Number of concurrent API requests to make",
type=int, default=DEFAULT_CONCURRENCY)
parser.add_argument('-o', '--output',
help="Output path for subtitles (by default, subtitles are saved in \
the same directory and name as the source path)")
parser.add_argument('-F', '--format', help="Destination subtitle format",
default=DEFAULT_SUBTITLE_FORMAT)
parser.add_argument('-S', '--src-language', help="Language spoken in source file",
default=DEFAULT_SRC_LANGUAGE)
parser.add_argument('-D', '--dst-language', help="Desired language for the subtitles",
default=DEFAULT_DST_LANGUAGE)
parser.add_argument('-K', '--api-key',
help="The Google Translate API key to be used. \
(Required for subtitle translation)")
parser.add_argument('--list-formats', help="List all available subtitle formats",
action='store_true')
parser.add_argument('--list-languages', help="List all available source/destination languages",
action='store_true')
args = parser.parse_args()
if args.list_formats:
print("List of formats:")
for subtitle_format in FORMATTERS:
print("{format}".format(format=subtitle_format))
return 0
if args.list_languages:
print("List of all languages:")
for code, language in sorted(LANGUAGE_CODES.items()):
print("{code}\t{language}".format(code=code, language=language))
return 0
if not validate(args):
return 1
try:
subtitle_file_path = generate_subtitles(
source_path=args.source_path,
concurrency=args.concurrency,
src_language=args.src_language,
dst_language=args.dst_language,
api_key=args.api_key,
subtitle_file_format=args.format,
output=args.output,
)
print("Subtitles file created at {}".format(subtitle_file_path))
except KeyboardInterrupt:
return 1
return 0
if __name__ == '__main__':
sys.exit(main())