import os import re import librosa import torch import numpy as np from pathlib import Path import locale # Установка локали для поддержки UTF-8 try: locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') except: pass # Игнорируем ошибку, если локаль недоступна from transformers import Wav2Vec2ForCTC, AutoProcessor from huggingface_hub import hf_hub_download from torchaudio.models.decoder import ctc_decoder from utils.text_norm import text_normalize # Используем фиксированную директорию для временных файлов TEMP_DIR = Path("D:/Ngen/bot/temp_lexicon") TEMP_DIR.mkdir(parents=True, exist_ok=True) uroman_dir = "uroman" assert os.path.exists(uroman_dir) UROMAN_PL = os.path.join(uroman_dir, "bin", "uroman.pl") ASR_SAMPLING_RATE = 16_000 WORD_SCORE_DEFAULT_IF_NOLM = -3.5 MODEL_ID = "mms-meta/mms-zeroshot-300m" processor = AutoProcessor.from_pretrained(MODEL_ID) model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) token_file = hf_hub_download( repo_id=MODEL_ID, filename="tokens.txt", ) class MY_LOG: def __init__(self): self.text = "[START]" def add(self, new_log, new_line=True): self.text = self.text + ("\n" if new_line else " ") + new_log self.text = self.text.strip() return self.text def create_temp_file(suffix=None): """Create a temporary file with UTF-8 encoding""" temp_path = TEMP_DIR / f"temp_{os.urandom(8).hex()}{suffix if suffix else ''}" # Создаем файл с явным указанием UTF-8 кодировки with open(temp_path, 'w', encoding='utf-8') as f: pass return temp_path def error_check_file(filepath): if not isinstance(filepath, str): return "Expected file to be of type 'str'. Instead got {}".format( type(filepath) ) if not os.path.exists(filepath): return "Input file '{}' doesn't exists".format(type(filepath)) def norm_uroman(text): """Normalize text with unicode support""" text = text.lower() text = text.replace("'", "'") text = re.sub("([^a-z' ])", " ", text) text = re.sub(" +", " ", text) # # Модифицируем регулярное выражение для поддержки unicode # text = re.sub(r"[^\p{L}'\s]", " ", text, flags=re.UNICODE) # text = re.sub(r"\s+", " ", text) return text.strip() def uromanize(words): """Romanize words with proper unicode handling""" iso = "xxx" input_file = create_temp_file(suffix=".txt") output_file = create_temp_file(suffix=".txt") try: # Записываем входные данные с UTF-8 кодировкой with open(input_file, "w", encoding='utf-8') as f: f.write("\n".join(words)) cmd = f"perl {UROMAN_PL} -l {iso} < {input_file} > {output_file}" os.system(cmd) lexicon = {} # Читаем результат с UTF-8 кодировкой with open(output_file, encoding='utf-8') as f: for idx, line in enumerate(f): if not line.strip(): continue try: line = re.sub(r"\s+", "", norm_uroman(line)).strip() lexicon[words[idx]] = " ".join(line) + " |" except Exception as e: print(f"Warning: Could not process line {idx}: {str(e)}") continue finally: # Очистка временных файлов try: input_file.unlink(missing_ok=True) output_file.unlink(missing_ok=True) except Exception as e: print(f"Warning: Could not delete temporary files: {str(e)}") return lexicon def filter_lexicon(lexicon, word_counts): spelling_to_words = {} for w, s in lexicon.items(): spelling_to_words.setdefault(s, []) spelling_to_words[s].append(w) filtered_lexicon = {} for s, ws in spelling_to_words.items(): if len(ws) > 1: # use the word which has highest counts, fewest additional characters ws.sort(key=lambda w: (-word_counts[w], len(w))) filtered_lexicon[ws[0]] = s return filtered_lexicon def load_words(filepath): """Load words from file with proper encoding handling""" words = {} # Пробуем различные кодировки encodings = ['utf-8', 'cp1251', 'latin-1', 'utf-16'] for encoding in encodings: try: with open(filepath, encoding=encoding) as f: lines = f.readlines() break # Если успешно прочитали файл, выходим из цикла except UnicodeDecodeError: continue # Если ошибка кодировки, пробуем следующую except Exception as e: raise Exception(f"Error reading file: {str(e)}") else: # Если ни одна кодировка не сработала raise Exception("Could not decode file with any of the attempted encodings") num_sentences = len(lines) all_sentences = " ".join([l.strip() for l in lines]) norm_all_sentences = text_normalize(all_sentences) for w in norm_all_sentences.split(): words.setdefault(w, 0) words[w] += 1 return words, num_sentences def process( audio_data, words_file, lm_path=None, wscore=None, lmscore=None, wscore_usedefault=True, lmscore_usedefault=True, autolm=False, reference=None, ): transcription, logs = "", MY_LOG() if not audio_data or not words_file: yield "ERROR: Empty audio data or words file", logs.text return if isinstance(audio_data, tuple): # microphone sr, audio_samples = audio_data audio_samples = (audio_samples / 32768.0).astype(float) if sr != ASR_SAMPLING_RATE: audio_samples = librosa.resample( audio_samples, orig_sr=sr, target_sr=ASR_SAMPLING_RATE ) else: # file upload assert isinstance(audio_data, str) audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] yield transcription, logs.add(f"Number of audio samples: {len(audio_samples)}") inputs = processor( audio_samples, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt" ) # set device if torch.cuda.is_available(): device = torch.device("cuda") elif ( hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and torch.backends.mps.is_built() ): device = torch.device("mps") else: device = torch.device("cpu") model.to(device) inputs = inputs.to(device) yield transcription, logs.add(f"Using device: {device}") with torch.no_grad(): outputs = model(**inputs).logits # Setup lexicon and decoder yield transcription, logs.add(f"Loading words....") try: word_counts, num_sentences = load_words(words_file) except Exception as e: yield f"ERROR: Loading words failed '{str(e)}'", logs.text return yield transcription, logs.add( f"Loaded {len(word_counts)} words from {num_sentences} lines.\nPreparing lexicon...." ) try: lexicon = uromanize(list(word_counts.keys())) except Exception as e: yield f"ERROR: Creating lexicon failed '{str(e)}'", logs.text return yield transcription, logs.add(f"Lexicon size: {len(lexicon)}") yield transcription, logs.add(f"Filtering lexicon....") lexicon = filter_lexicon(lexicon, word_counts) yield transcription, logs.add( f"Ok. Lexicon size after filtering: {len(lexicon)}" ) lexicon_file = create_temp_file(suffix=".txt") try: with open(lexicon_file, "w", encoding='utf-8') as f: idx = 10 for word, spelling in lexicon.items(): f.write(word + " " + spelling + "\n") idx += 1 if wscore_usedefault: wscore = WORD_SCORE_DEFAULT_IF_NOLM yield transcription, logs.add( f"Using word score: {wscore}" ) beam_search_decoder = ctc_decoder( lexicon=str(lexicon_file), tokens=token_file, nbest=1, beam_size=500, beam_size_token=50, word_score=wscore, sil_score=0, blank_token="", ) beam_search_result = beam_search_decoder(outputs.to("cpu")) transcription = " ".join(beam_search_result[0][0].words).strip() finally: # Очистка временных файлов try: lexicon_file.unlink(missing_ok=True) except Exception as e: print(f"Warning: Could not delete temporary file: {str(e)}") yield transcription, logs.add(f"[DONE]")