wwdok's picture
add chatgpt support
ea6d8b2
import textwrap
import unicodedata
import re
import time
import zlib
from typing import Iterator, TextIO, Union
import tqdm
import pyperclip
import urllib3
import openai
import tiktoken
def exact_div(x, y):
assert x % y == 0
return x // y
def str2bool(string):
str2val = {"True": True, "False": False}
if string in str2val:
return str2val[string]
else:
raise ValueError(f"Expected one of {set(str2val.keys())}, got {string}")
def optional_int(string):
return None if string == "None" else int(string)
def optional_float(string):
return None if string == "None" else float(string)
def compression_ratio(text) -> float:
return len(text) / len(zlib.compress(text.encode("utf-8")))
def format_timestamp(seconds: float, always_include_hours: bool = False, fractionalSeperator: str = '.'):
assert seconds >= 0, "non-negative timestamp expected"
milliseconds = round(seconds * 1000.0)
hours = milliseconds // 3_600_000
milliseconds -= hours * 3_600_000
minutes = milliseconds // 60_000
milliseconds -= minutes * 60_000
seconds = milliseconds // 1_000
milliseconds -= seconds * 1_000
hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else ""
return f"{hours_marker}{minutes:02d}:{seconds:02d}{fractionalSeperator}{milliseconds:03d}"
def write_txt(transcript: Iterator[dict], file: TextIO):
for segment in transcript:
print(segment['text'].strip(), file=file, flush=True)
def write_vtt(transcript: Iterator[dict], file: TextIO,
maxLineWidth=None, highlight_words: bool = False):
iterator = __subtitle_preprocessor_iterator(transcript, maxLineWidth, highlight_words)
print("WEBVTT\n", file=file)
for segment in iterator:
text = segment['text'].replace('-->', '->')
print(
f"{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}\n"
f"{text}\n",
file=file,
flush=True,
)
def write_srt(transcript: Iterator[dict], file: TextIO,
maxLineWidth=None, highlight_words: bool = False):
"""
Write a transcript to a file in SRT format.
Example usage:
from pathlib import Path
from whisper.utils import write_srt
result = transcribe(model, audio_path, temperature=temperature, **args)
# save SRT
audio_basename = Path(audio_path).stem
with open(Path(output_dir) / (audio_basename + ".srt"), "w", encoding="utf-8") as srt:
write_srt(result["segments"], file=srt)
"""
iterator = __subtitle_preprocessor_iterator(transcript, maxLineWidth, highlight_words)
for i, segment in enumerate(iterator, start=1):
text = segment['text'].replace('-->', '->')
# write srt lines
print(
f"{i}\n"
f"{format_timestamp(segment['start'], always_include_hours=True, fractionalSeperator=',')} --> "
f"{format_timestamp(segment['end'], always_include_hours=True, fractionalSeperator=',')}\n"
f"{text}\n",
file=file,
flush=True,
)
def __subtitle_preprocessor_iterator(transcript: Iterator[dict], maxLineWidth: int = None, highlight_words: bool = False):
for segment in transcript:
words = segment.get('words', [])
if len(words) == 0:
# Yield the segment as-is or processed
if maxLineWidth is None or maxLineWidth < 0:
yield segment
else:
yield {
'start': segment['start'],
'end': segment['end'],
'text': process_text(segment['text'].strip(), maxLineWidth)
}
# We are done
continue
subtitle_start = segment['start']
subtitle_end = segment['end']
text_words = [ this_word["word"] for this_word in words ]
subtitle_text = __join_words(text_words, maxLineWidth)
# Iterate over the words in the segment
if highlight_words:
last = subtitle_start
for i, this_word in enumerate(words):
start = this_word['start']
end = this_word['end']
if last != start:
# Display the text up to this point
yield {
'start': last,
'end': start,
'text': subtitle_text
}
# Display the text with the current word highlighted
yield {
'start': start,
'end': end,
'text': __join_words(
[
{
"word": re.sub(r"^(\s*)(.*)$", r"\1<u>\2</u>", word)
if j == i
else word,
# The HTML tags <u> and </u> are not displayed,
# # so they should not be counted in the word length
"length": len(word)
} for j, word in enumerate(text_words)
], maxLineWidth)
}
last = end
if last != subtitle_end:
# Display the last part of the text
yield {
'start': last,
'end': subtitle_end,
'text': subtitle_text
}
# Just return the subtitle text
else:
yield {
'start': subtitle_start,
'end': subtitle_end,
'text': subtitle_text
}
def __join_words(words: Iterator[Union[str, dict]], maxLineWidth: int = None):
if maxLineWidth is None or maxLineWidth < 0:
return " ".join(words)
lines = []
current_line = ""
current_length = 0
for entry in words:
# Either accept a string or a dict with a 'word' and 'length' field
if isinstance(entry, dict):
word = entry['word']
word_length = entry['length']
else:
word = entry
word_length = len(word)
if current_length > 0 and current_length + word_length > maxLineWidth:
lines.append(current_line)
current_line = ""
current_length = 0
current_length += word_length
# The word will be prefixed with a space by Whisper, so we don't need to add one here
current_line += word
if len(current_line) > 0:
lines.append(current_line)
return "\n".join(lines)
def process_text(text: str, maxLineWidth=None):
if (maxLineWidth is None or maxLineWidth < 0):
return text
lines = textwrap.wrap(text, width=maxLineWidth, tabsize=4)
return '\n'.join(lines)
def slugify(value, allow_unicode=False):
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[-\s]+', '-', value).strip('-_')
def download_file(url: str, destination: str):
with urllib3.request.urlopen(url) as source, open(destination, "wb") as output:
with tqdm(
total=int(source.info().get("Content-Length")),
ncols=80,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as loop:
while True:
buffer = source.read(8192)
if not buffer:
break
output.write(buffer)
loop.update(len(buffer))
# -------------used for text post processing tab----------------
system_prompt = "You are a helpful assistant."
user_prompt = "请帮我把下面的文本纠正错别字并添加合适的标点符号,返回的消息只要处理后的文本:"
def get_chunks(s, maxlength, separator=None):
start = 0
end = 0
while start + maxlength < len(s) and end != -1:
if separator is not None:
end = s.rfind(separator, start, start + maxlength + 1)
segment = s[start:end]
yield segment.replace(separator, "")
start = end +1
else:
end = start + maxlength
yield s[start:end]
start = end
yield s[start:]
def post_processing(text, use_chatgpt, user_token, apply_correction, auto_punc, separator, remove_words):
# print(f"==>> separator: {separator}")
original_separator1 = " "
original_separator2 = ","
if use_chatgpt == True:
if user_token == "":
text = "请先设置你的OpenAI API Key,然后再重试"
return text
else:
text = chat_with_gpt(text, system_prompt, user_prompt)
return text
# 对于长文本需要先分段再推理,推理完再合并
elif auto_punc == True:
# 自动分段文本之前先去除原有的标点符号
text = text.replace(original_separator1, "")
text = text.replace(original_separator2, "")
import paddlehub as hub
model = hub.Module(name='auto_punc', version='1.0.0')
t3 = time.time()
# split long text to short text less than max_length and store them in list
max_length = 256
chunks = list(get_chunks(text, max_length))
results = []
results = model.add_puncs(chunks, max_length=max_length)
text = ",".join(results) # 分段处硬编码成使用中文逗号分割
t4 = time.time()
print("Auto punc finished. Cost time: {:.2f}s".format(t4-t3))
# print(f"==>> text after auto punc: {text}")
else:
# 将空格全部统一替换成一种分隔符
if separator == "\\n":
# 直接使用separator会无法换行
text = text.replace(original_separator1, "\n")
text = text.replace(original_separator2, "\n")
else:
text = text.replace(original_separator1, separator)
text = text.replace(original_separator2, separator)
if apply_correction == True:
import pycorrector
print("Start correcting...")
t1 = time.time()
text, detail = pycorrector.correct(text)
t2 = time.time()
print("Correcting finished. Cost time: {:.2f}s".format(t2-t1))
print(f"==>> detail: {detail}")
# 去掉语气词
t5 = time.time()
remove_words = remove_words.split(",") + remove_words.split(",") + remove_words.split(" ")
for word in remove_words:
text = text.replace(word, "")
t6 = time.time()
print("Remove words finished. Cost time: {:.2f}s".format(t6-t5))
return text
def replace(text, src_word, target_word):
text = text.replace(src_word, target_word)
return text
def copy_text(text):
pyperclip.copy(text)
def num_tokens_from_messages(message):
"""Return the number of tokens used by a list of messages."""
model="gpt-3.5-turbo-0613"
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
if model in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
}:
tokens_per_message = 3
tokens_per_name = 1
elif model == "gpt-3.5-turbo-0301":
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
tokens_per_name = -1 # if there's a name, the role is omitted
elif "gpt-3.5-turbo" in model:
print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
return num_tokens_from_messages(message, model="gpt-3.5-turbo-0613")
elif "gpt-4" in model:
print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
return num_tokens_from_messages(message, model="gpt-4-0613")
else:
raise NotImplementedError(
f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
)
num_tokens = 0
num_tokens += tokens_per_message
message = user_prompt + message
num_tokens += len(encoding.encode(message))
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return f"Tokens 计数: {num_tokens}"
def on_token_change(user_token):
openai.api_key = user_token
def chat_with_gpt(input_message, system_prompt, user_prompt, temperature=0, max_tokens=4096):
system_content = [{ "role": "system", "content": system_prompt }]
user_content = [{ "role": "user", "content": user_prompt + input_message }]
try:
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=system_content + user_content, temperature=temperature, max_tokens=max_tokens)
response_msg = completion.choices[0].message['content']
prompt_tokens = completion['usage']['prompt_tokens']
completion_tokens = completion['usage']['completion_tokens']
total_tokens = completion['usage']['total_tokens']
print(f"==>> prompt_tokens: {prompt_tokens}")
print(f"==>> completion_tokens: {completion_tokens}")
print(f"==>> total_tokens: {total_tokens}")
return response_msg
except Exception as e:
return f"Error: {e}"
# -------------used for text post processing tab----------------