import gradio as gr import json import requests import os import urllib.request import ssl import base64 import soundfile as sf from io import BytesIO import tempfile from datetime import datetime import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class AzureSpeechTranslatorApp: def __init__(self): # Azure ML endpoint configuration self.url = os.getenv("AZURE_ENDPOINT") self.api_key = os.getenv("AZURE_API_KEY") # Define supported languages with their codes and native names self.languages = { "English": { "code": "en", "native": "English" }, "Chinese": { "code": "zh", "native": "中文" }, "German": { "code": "de", "native": "Deutsch" }, "French": { "code": "fr", "native": "Français" }, "Italian": { "code": "it", "native": "Italiano" }, "Japanese": { "code": "ja", "native": "日本語" }, "Spanish": { "code": "es", "native": "Español" }, "Portuguese": { "code": "pt", "native": "Português" } } # Initialize storage self.translations_dir = "translations" os.makedirs(self.translations_dir, exist_ok=True) self.translations = self.load_translations() def get_translation_file_path(self, lang_code): """Get path for language-specific translation file""" return os.path.join(self.translations_dir, f"translations_{lang_code}.json") def load_translations(self): """Load translations for all languages""" translations = {} for lang_info in self.languages.values(): file_path = self.get_translation_file_path(lang_info["code"]) if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as f: translations[lang_info["code"]] = json.load(f) else: translations[lang_info["code"]] = [] return translations def save_translation(self, lang_code, translation): """Save translation for specific language""" file_path = self.get_translation_file_path(lang_code) with open(file_path, 'w', encoding='utf-8') as f: json.dump(translation, f, ensure_ascii=False, indent=2) def call_azure_endpoint(self, payload): """Call Azure ML endpoint with the given payload.""" # Allow self-signed HTTPS certificates def allow_self_signed_https(allowed): if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None): ssl._create_default_https_context = ssl._create_unverified_context allow_self_signed_https(True) # Set parameters parameters = {"temperature": 0.7} if "parameters" not in payload["input_data"]: payload["input_data"]["parameters"] = parameters # Encode the request body body = str.encode(json.dumps(payload)) if not self.api_key: raise Exception("A key should be provided to invoke the endpoint") # Set up headers headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + self.api_key)} # Create and send the request req = urllib.request.Request(self.url, body, headers) try: logger.info(f"Sending request to {self.url}") response = urllib.request.urlopen(req) result = response.read().decode('utf-8') logger.info("Received response successfully") return json.loads(result) except urllib.error.HTTPError as error: logger.error(f"Request failed with status code: {error.code}") logger.error(f"Headers: {error.info()}") error_message = error.read().decode("utf8", 'ignore') logger.error(f"Error message: {error_message}") return {"error": error_message} def encode_audio_base64(self, audio_path): """Encode audio file to base64 and determine MIME type""" file_extension = os.path.splitext(audio_path)[1].lower() # Map file extensions to MIME types if file_extension == '.flac': mime_type = "audio/flac" elif file_extension == '.wav': mime_type = "audio/wav" elif file_extension == '.mp3': mime_type = "audio/mpeg" elif file_extension in ['.m4a', '.aac']: mime_type = "audio/aac" elif file_extension == '.ogg': mime_type = "audio/ogg" else: mime_type = "audio/wav" # Default to WAV # Read and encode file content with open(audio_path, "rb") as file: encoded_string = base64.b64encode(file.read()).decode('utf-8') return encoded_string, mime_type def transcribe_audio(self, audio_input, source_lang="English"): """Transcribe audio to text using Azure endpoint""" try: # Encode audio to base64 base64_audio, mime_type = self.encode_audio_base64(audio_input) # Create input content for Azure ML content_items = [ { "type": "text", "text": f"Transcribe this {source_lang} audio to text." }, { "type": "audio_url", "audio_url": { "url": f"data:{mime_type};base64,{base64_audio}" } } ] # Create conversation state for Azure ML conversation_state = [ { "role": "user", "content": content_items } ] # Create the payload payload = { "input_data": { "input_string": conversation_state } } # Call Azure ML endpoint response = self.call_azure_endpoint(payload) # Extract text response try: if isinstance(response, dict): if "result" in response: result = response["result"] elif "output" in response: if isinstance(response["output"], list) and len(response["output"]) > 0: result = response["output"][0] else: result = str(response["output"]) elif "error" in response: result = f"Error: {response['error']}" else: result = f"Unexpected response format: {json.dumps(response)}" else: result = str(response) except Exception as e: result = f"Error processing response: {str(e)}" return result.strip() except Exception as e: logger.error(f"Error in transcription: {str(e)}") return f"Transcription failed: {str(e)}" def translate_text(self, text, source_lang, target_lang): """Translate text between languages using Azure endpoint""" if not text: return "No text to translate" try: # Create input content for Azure ML content_items = [ { "type": "text", "text": f"Translate the following {source_lang} text to {target_lang}. Provide only the translation without any additional text or explanation:\n\n{text}" } ] # Create conversation state for Azure ML conversation_state = [ { "role": "system", "content": [{"type": "text", "text": "You are a professional translator."}] }, { "role": "user", "content": content_items } ] # Create the payload payload = { "input_data": { "input_string": conversation_state } } # Call Azure ML endpoint response = self.call_azure_endpoint(payload) # Extract text response try: if isinstance(response, dict): if "result" in response: result = response["result"] elif "output" in response: if isinstance(response["output"], list) and len(response["output"]) > 0: result = response["output"][0] else: result = str(response["output"]) elif "error" in response: result = f"Error: {response['error']}" else: result = f"Unexpected response format: {json.dumps(response)}" else: result = str(response) except Exception as e: result = f"Error processing response: {str(e)}" return result.strip() except Exception as e: logger.error(f"Error in translation: {str(e)}") return f"Translation failed: {str(e)}" def process_translation(self, audio, source_lang, target_lang): """Process audio input and generate translation""" if not audio: return "Please provide an audio file to translate." # Transcribe audio to text source_text = self.transcribe_audio(audio, source_lang) # Translate to target language translation = self.translate_text(source_text, source_lang, target_lang) # Create translation entry translation_entry = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source_language": source_lang, "target_language": target_lang, "source_text": source_text, "translated_text": translation } # Save translation source_code = self.languages[source_lang]["code"] target_code = self.languages[target_lang]["code"] if source_code not in self.translations: self.translations[source_code] = [] if target_code not in self.translations: self.translations[target_code] = [] self.translations[source_code].append(translation_entry) self.translations[target_code].append(translation_entry) self.save_translation(source_code, self.translations[source_code]) self.save_translation(target_code, self.translations[target_code]) return self.format_translation_display(translation_entry) def format_translation_display(self, entry): """Format translation for display""" output = f"""Timestamp: {entry['timestamp']}\n\n""" output += f"""Source Language ({entry['source_language']}):\n{entry['source_text']}\n\n""" output += f"""Target Language ({entry['target_language']}):\n{entry['translated_text']}\n""" return output def list_translations(self, lang_code): """List translations for specific language""" if lang_code not in self.translations or not self.translations[lang_code]: return "No translations found" return "\n\n---\n\n".join([ self.format_translation_display(entry) for entry in self.translations[lang_code] ]) def create_interface(self): """Create Gradio interface""" with gr.Blocks(theme=gr.themes.Soft()) as interface: gr.Markdown("# Phine Speech Translator with Phi-4-Multimodal") gr.Markdown("Record speech or upload audio file for translation between multiple languages using [Phi-4-Multimodal](https://aka.ms/phi-4-multimodal/azure)") with gr.Row(): source_lang = gr.Dropdown( choices=list(self.languages.keys()), value="English", label="Source Language" ) target_lang = gr.Dropdown( choices=list(self.languages.keys()), value="Chinese", label="Target Language" ) with gr.Row(): audio_input = gr.Audio( sources=["microphone", "upload"], type="filepath", label="Record or Upload Audio" ) with gr.Row(): translate_btn = gr.Button("Translate") with gr.Row(): output = gr.Textbox( label="Translation Results", lines=10 ) # History viewer with gr.Accordion("Translation History", open=False): lang_select = gr.Dropdown( choices=list(self.languages.keys()), value="English", label="Select Language" ) history_output = gr.Textbox( label="Translation History", lines=20 ) # Event handlers translate_btn.click( fn=self.process_translation, inputs=[audio_input, source_lang, target_lang], outputs=output ) lang_select.change( fn=lambda x: self.list_translations(self.languages[x]["code"]), inputs=[lang_select], outputs=history_output ) return interface def run_app(): # Create app instance app = AzureSpeechTranslatorApp() # Launch Gradio interface interface = app.create_interface() interface.launch( share=True, server_name="0.0.0.0" ) if __name__ == "__main__": run_app()