import os import gradio as gr import numpy as np import soundfile as sf import torchaudio from speechbrain.pretrained.interfaces import foreign_class from app_utils import video_score,video_test from authors import AUTHORS # Importing necessary components for the Gradio app from description import DESCRIPTION_DYNAMIC # , DESCRIPTION_STATIC # import scipy.io.wavfile as wav from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline from gradio_client import Client client = Client("Liusuthu/TextDepression") os.environ["no_proxy"] = "localhost,127.0.0.1,::1" ###########################语音部分###################################### classifier = foreign_class( source="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", # ".\\emotion-recognition-wav2vec2-IEMOCAP" pymodule_file="custom_interface.py", classname="CustomEncoderWav2vec2Classifier", savedir="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", ) ASR_model = ParaformerOffline() vad = FSMNVad() punc = CttPunctuator() def text_api(text:str): result = client.predict( text, # str in '输入文字' Textbox component api_name="/predict", ) return result def get_text_score(text): string=text_api(text) part1 = str.partition(string, r"text") want1 = part1[2] label = want1[4:6] part2 = str.partition(string, r"probability") want2 = part2[2] prob = float(want2[3:-4]) return label, prob def classify_continuous(audio): print(type(audio)) print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("data/a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("data/a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "data/out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print(type(text_lab[-1])) print(text_lab[-1]) return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio def speech_score(audio): print(type(audio)) print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("data/a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("data/a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "data/out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print(type(text_lab[-1])) print(text_lab[-1]) #return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio prob=out_prob.squeeze(0).numpy() print(prob) score2=10*prob[0]-10*prob[1] print("score2",score2) print(text_lab[-1]) text_emo=str(get_text_score(text_results)) print(text_emo) return score2,text_emo #########################################视频部分################################### def clear_dynamic_info(): return ( gr.Video(value=None), gr.Plot(value=None), gr.Textbox(""), ) ##################################设置各自的app类#################### with gr.Blocks(css="app.css") as video: with gr.Row(): with gr.Column(scale=2): input_video = gr.Video( sources=["webcam", "upload"], elem_classes="video1", format='mp4' ) with gr.Row(): clear_btn_dynamic = gr.Button( value="Clear", interactive=True, scale=1 ) # submit_dynamic = gr.Button( # value="Submit", interactive=True, scale=1, elem_classes="submit" # ) submit_and_rank = gr.Button( value="Score", interactive=True, scale=1, elem_classes="submit" ) with gr.Column(scale=2, elem_classes="dl4"): with gr.Row(): output_score = gr.Textbox(label="scores") output_statistics = gr.Plot( label="Statistics of emotions", elem_classes="stat" ) output_audio=gr.Audio(interactive=False) audio_test_button=gr.Button("分析语音") out1=gr.Textbox(label="语音分析结果") out2=gr.Textbox(label="音频情感识别1") out3=gr.Textbox(label="音频情感识别2") text_test_button=gr.Button("分析文本") text_result=gr.Textbox(interactive=False) clear_btn_dynamic.click( fn=clear_dynamic_info, inputs=[], outputs=[ input_video, output_statistics, output_score, ], queue=True, ) submit_and_rank.click( fn=video_score, inputs=input_video, outputs=[ output_statistics, output_score, output_audio, ], ) audio_test_button.click( fn=classify_continuous, inputs=output_audio, outputs=[out1,out2,out3] ) text_test_button.click( fn=text_api, inputs=out1, outputs=text_result, ) #################################### speech = gr.Interface( classify_continuous, gr.Audio(sources=["microphone"]), [ gr.Text(label="语音识别结果"), gr.Text(label="音频情感识别1"), gr.Text(label="音频情感识别2"), ], ) ############################################################ def clear_video(): return ( gr.Video(value=None), gr.Textbox(""), gr.Plot(value=None), gr.Audio(value=None), gr.Textbox(""), gr.Textbox(""), ) with gr.Blocks() as video_all: with gr.Row(): with gr.Column(scale=2): input_video = gr.Video( sources=["webcam","upload"], elem_classes="video1", format='mp4' ) with gr.Row(): clear_1 = gr.Button( value="Clear", interactive=True, scale=1 ) submit_1 = gr.Button( value="Score", interactive=True, scale=1, elem_classes="submit" ) with gr.Column(scale=2): with gr.Row(): score1 = gr.Textbox(label="score1") output_statistics = gr.Plot( label="Statistics of emotions", elem_classes="stat",visible=False, ) output_audio=gr.Audio(interactive=False,visible=False) score2=gr.Textbox(label="score2") score3=gr.Textbox(label="score3") clear_1.click( fn=clear_video, inputs=[], outputs=[input_video,score1,output_statistics,output_audio,score2,score3], queue=True, ) submit_1.click( fn=video_test, inputs=input_video, outputs=[ output_statistics, score1, output_audio, score2, score3, ], ) ################################################################### def clear_2(): return ( gr.Audio(value=None), gr.Textbox(""), gr.Textbox(""), ) with gr.Blocks() as speech_all: with gr.Row(): with gr.Column(scale=2): input_audio=gr.Audio(sources="microphone") with gr.Row(): clear_audio = gr.Button( value="Clear", interactive=True, scale=1 ) submit_audio = gr.Button( value="Score", interactive=True, scale=1, elem_classes="submit" ) with gr.Column(scale=2): score2=gr.Textbox(interactive=False,label="score2") text_emo=gr.Textbox(interactive=False,label="text_emo") clear_audio.click( fn=clear_2, outputs=[input_audio,score2,text_emo] ) submit_audio.click( fn=speech_score, inputs=[input_audio], outputs=[score2,text_emo], ) ################################################################### def clear_3(): return gr.Textbox(""),gr.Textbox("") def text_score(text): result=str(get_text_score(text)) return result with gr.Blocks() as text_all: with gr.Row(): with gr.Column(scale=2): input_text=gr.Textbox(label="input") with gr.Row(): clear_text = gr.Button( value="Clear", interactive=True, scale=1 ) submit_text = gr.Button( value="Score", interactive=True, scale=1, elem_classes="submit" ) with gr.Column(scale=2): text_emo=gr.Textbox(label="text_emo") clear_text.click(clear_3,outputs=[input_text,text_emo]) submit_text.click(text_score,inputs=input_text,outputs=text_emo) with gr.Blocks() as app: with gr.Tab("语音"): speech.render() with gr.Tab("视频"): video.render() with gr.Tab("视频集成打分"): video_all.render() with gr.Tab("语音集成打分"): speech_all.render() with gr.Tab("文本打分"): text_all.render() app.launch()