Spaces:
Runtime error
Runtime error
import gradio as gr | |
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoTokenizer, AutoProcessor | |
import torch | |
import logging | |
import os | |
import time | |
import shutil # เพิ่มการ import shutil | |
from pydub import AudioSegment, silence | |
from concurrent.futures import ProcessPoolExecutor | |
from io import StringIO | |
# ตั้งค่า logging | |
log_stream = StringIO() | |
logging.basicConfig(level=logging.DEBUG, stream=log_stream) | |
logger = logging.getLogger(__name__) | |
# กำหนด path สำหรับ model, audio, segment, และ text | |
MODEL_DIR = "/content/model" | |
AUDIO_DIR = "/content/audio" | |
TEXT_DIR = "/content/text" | |
# สร้าง directories ถ้ายังไม่มี | |
os.makedirs(MODEL_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
os.makedirs(TEXT_DIR, exist_ok=True) | |
# กำหนดชื่อโมเดลและภาษา | |
MODEL_NAME = "FILM6912/Whisper-small-thai" | |
lang = "th" | |
# ตรวจสอบว่าใช้ GPU ได้หรือไม่ ถ้าไม่ได้ใช้ CPU | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Using device: {'GPU' if device.type == 'cuda' else 'CPU'}") | |
# โหลดโมเดลและ Tokenizer ล่วงหน้าเพื่อเก็บใน cache directory | |
try: | |
logger.info("Loading model...") | |
model = AutoModelForSpeechSeq2Seq.from_pretrained(MODEL_NAME, cache_dir=MODEL_DIR).to(device) | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, cache_dir=MODEL_DIR) | |
processor = AutoProcessor.from_pretrained(MODEL_NAME, cache_dir=MODEL_DIR) | |
# สร้าง pipeline สำหรับการแปลงเสียงเป็นข้อความ | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=model, | |
tokenizer=tokenizer, | |
feature_extractor=processor.feature_extractor, | |
chunk_length_s=30, | |
device=0 if device.type == 'cuda' else -1, # บังคับให้ใช้ GPU (device=0) ถ้ามี | |
) | |
logger.info("Model loaded successfully.") | |
except Exception as e: | |
logger.error(f"Error loading model: {e}") | |
raise | |
# ฟังก์ชันสำหรับการถอดเสียงแบบขนาน | |
def transcribe_segment(segment: AudioSegment): | |
try: | |
# ส่งออก segment เป็นไฟล์ WAV ในหน่วยความจำ | |
segment_wav = segment.export(format="wav") | |
# ถอดเสียงจากไฟล์ segment | |
transcription = pipe(segment_wav)["text"] | |
return transcription | |
except Exception as e: | |
logger.error(f"Error during segment transcription: {e}") | |
return "" | |
# ฟังก์ชันสำหรับการแปลงเสียงเป็นข้อความ | |
def transcribe_with_parallel_processing(microphone_audio, upload_audio): | |
audio_path = microphone_audio or upload_audio | |
# ตรวจสอบเส้นทางไฟล์เพื่อให้แน่ใจว่าไฟล์มีอยู่จริง | |
if not audio_path or not os.path.exists(audio_path): | |
logger.error("No audio input received or file does not exist.") | |
return "No audio input received or file does not exist.", None, None | |
logger.debug(f"Processing audio file: {audio_path}") | |
# 8387rcPNz8SRX6pYXgdxCZg3VMLFwtdJB3Z9LeX8Ge2n | |
sanitized_filename = os.path.basename(audio_path).replace(" ", "_") | |
audio_filename = os.path.join(AUDIO_DIR, sanitized_filename) | |
shutil.copyfile(audio_path, audio_filename) | |
try: | |
# โหลดไฟล์เสียงด้วย pydub | |
audio = AudioSegment.from_wav(audio_filename) | |
# แบ่งไฟล์เสียงเมื่อเสียงเงียบ | |
chunks = silence.split_on_silence(audio, min_silence_len=1000, silence_thresh=-30, keep_silence=500) | |
logger.info(f"Audio split into {len(chunks)} segments.") | |
if not chunks: | |
logger.error("No segments created. Ensure the audio file is correct.") | |
return "No segments created. Ensure the audio file is correct.", None, log_stream.getvalue() | |
# ถอดเสียงแบบขนาน | |
with ProcessPoolExecutor() as executor: | |
transcriptions = list(executor.map(transcribe_segment, chunks)) | |
# รวมผลลัพธ์จากแต่ละส่วนเข้าด้วยกัน | |
full_transcription = " ".join(transcriptions) | |
logger.info("Transcription completed successfully.") | |
# บันทึกผลลัพธ์เป็นไฟล์ .txt โดยตั้งชื่อไฟล์ตาม Unix Time | |
output_filename = os.path.join(TEXT_DIR, f"transcription_{int(time.time())}.txt") | |
with open(output_filename, "w", encoding="utf-8") as file: | |
file.write(full_transcription) | |
logger.info(f"Transcription exported to {output_filename}.") | |
# ลบไฟล์เสียงต้นฉบับหลังการใช้งานเสร็จสิ้น | |
os.remove(audio_filename) | |
return full_transcription, output_filename, log_stream.getvalue() | |
except Exception as e: | |
logger.error(f"Error during transcription: {e}") | |
return "Error during transcription.", None, log_stream.getvalue() | |
# สร้างอินเทอร์เฟซด้วย Gradio พร้อมปุ่มยืนยัน | |
with gr.Blocks() as interface: | |
with gr.Row(): | |
audio_input = gr.Audio(type="filepath", label="Record or Upload your voice") # ใช้ filepath สำหรับการบันทึกเสียง | |
submit_btn = gr.Button("Start Transcription") | |
output_text = gr.Textbox(label="Transcription") | |
output_file = gr.File(label="Download Transcription File") | |
log_output = gr.Textbox(label="Logs", lines=10) # ช่องสำหรับแสดง logs | |
submit_btn.click( | |
fn=transcribe_with_parallel_processing, | |
inputs=[audio_input, audio_input], # ใช้ input เดียวกันสำหรับไฟล์อัปโหลดและการบันทึก | |
outputs=[output_text, output_file, log_output] # เพิ่ม log_output สำหรับแสดง logs | |
) | |
# รันแอปพลิเคชันและแชร์ลิงก์ | |
logger.info("Launching Gradio interface...") | |
interface.launch(share=True) | |
logger.info("Gradio interface launched successfully.") | |