import torch import time import numpy as np import mediapipe as mp from PIL import Image import cv2 from pytorch_grad_cam.utils.image import show_cam_on_image import scipy.io.wavfile as wav # Importing necessary components for the Gradio app from model import pth_model_static, pth_model_dynamic, cam, pth_processing from face_utils import get_box, display_info from config import DICT_EMO, config_data from plot import statistics_plot from moviepy.editor import AudioFileClip import soundfile as sf import torchaudio from speechbrain.pretrained.interfaces import foreign_class from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline from gradio_client import Client ############################################################################################## client = Client("Liusuthu/TextDepression") mp_face_mesh = mp.solutions.face_mesh classifier = foreign_class( source="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", # ".\\emotion-recognition-wav2vec2-IEMOCAP" pymodule_file="custom_interface.py", classname="CustomEncoderWav2vec2Classifier", savedir="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", ) ASR_model = ParaformerOffline() vad = FSMNVad() punc = CttPunctuator() ######################################################################################### def text_api(text:str): result = client.predict( text, # str in '输入文字' Textbox component api_name="/predict", ) return result def get_text_score(text): string=text_api(text) part1 = str.partition(string, r"text") want1 = part1[2] label = want1[4:6] part2 = str.partition(string, r"probability") want2 = part2[2] prob = float(want2[3:-4]) return label, prob def classify_continuous(audio): print(type(audio)) print(audio) sample_rate, signal = (audio) # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("data/a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("data/a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "data/out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print(type(text_lab[-1])) print(text_lab[-1]) return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio def preprocess_image_and_predict(inp): return None, None, None def preprocess_video_and_predict(video): return None, None, None, None def text_api(text:str): result = client.predict( text, # str in '输入文字' Textbox component api_name="/predict", ) return result def get_text_score(text): string=text_api(text) part1 = str.partition(string, r"text") want1 = part1[2] label = want1[4:6] part2 = str.partition(string, r"probability") want2 = part2[2] prob = float(want2[3:-4]) return label, prob def classify_continuous(audio): print(type(audio)) print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("data/a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("data/a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "data/out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print(type(text_lab[-1])) print(text_lab[-1]) return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio ####################################################################### #规范函数,只管值输入输出: def text_score(text): string=text_api(text) part1 = str.partition(string, r"text") want1 = part1[2] label = want1[4:6] part2 = str.partition(string, r"probability") want2 = part2[2] prob = float(want2[3:-4]) if label=="正向": score=-prob*10 else: score=prob*10 return text,score def speech_score(audio): print(type(audio)) print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("data/a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("data/a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "data/out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print(type(text_lab[-1])) print(text_lab[-1]) #return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio prob=out_prob.squeeze(0).numpy() print(prob) score2=10*prob[0]-10*prob[1] print("score2",score2) print(text_lab[-1]) text,score1=text_score(text_results) # text_emo=str(get_text_score(text_results)) print(text,score1) score=score1+score2 return text,score def video_score(video): cap = cv2.VideoCapture(video) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = np.round(cap.get(cv2.CAP_PROP_FPS)) path_save_video_face = 'result_face.mp4' vid_writer_face = cv2.VideoWriter(path_save_video_face, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224)) # path_save_video_hm = 'result_hm.mp4' # vid_writer_hm = cv2.VideoWriter(path_save_video_hm, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224)) lstm_features = [] count_frame = 1 count_face = 0 probs = [] frames = [] last_output = None last_heatmap = None cur_face = None with mp_face_mesh.FaceMesh( max_num_faces=1, refine_landmarks=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh: while cap.isOpened(): _, frame = cap.read() if frame is None: break frame_copy = frame.copy() frame_copy.flags.writeable = False frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) results = face_mesh.process(frame_copy) frame_copy.flags.writeable = True if results.multi_face_landmarks: for fl in results.multi_face_landmarks: startX, startY, endX, endY = get_box(fl, w, h) cur_face = frame_copy[startY:endY, startX: endX] if count_face%config_data.FRAME_DOWNSAMPLING == 0: cur_face_copy = pth_processing(Image.fromarray(cur_face)) with torch.no_grad(): features = torch.nn.functional.relu(pth_model_static.extract_features(cur_face_copy)).detach().numpy() # grayscale_cam = cam(input_tensor=cur_face_copy) # grayscale_cam = grayscale_cam[0, :] # cur_face_hm = cv2.resize(cur_face,(224,224), interpolation = cv2.INTER_AREA) # cur_face_hm = np.float32(cur_face_hm) / 255 # heatmap = show_cam_on_image(cur_face_hm, grayscale_cam, use_rgb=False) # last_heatmap = heatmap if len(lstm_features) == 0: lstm_features = [features]*10 else: lstm_features = lstm_features[1:] + [features] lstm_f = torch.from_numpy(np.vstack(lstm_features)) lstm_f = torch.unsqueeze(lstm_f, 0) with torch.no_grad(): output = pth_model_dynamic(lstm_f).detach().numpy() last_output = output if count_face == 0: count_face += 1 else: if last_output is not None: output = last_output # heatmap = last_heatmap elif last_output is None: output = np.empty((1, 7)) output[:] = np.nan probs.append(output[0]) frames.append(count_frame) else: if last_output is not None: lstm_features = [] empty = np.empty((7)) empty[:] = np.nan probs.append(empty) frames.append(count_frame) if cur_face is not None: # heatmap_f = display_info(heatmap, 'Frame: {}'.format(count_frame), box_scale=.3) cur_face = cv2.cvtColor(cur_face, cv2.COLOR_RGB2BGR) cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA) cur_face = display_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3) vid_writer_face.write(cur_face) # vid_writer_hm.write(heatmap_f) count_frame += 1 if count_face != 0: count_face += 1 vid_writer_face.release() # vid_writer_hm.release() stat = statistics_plot(frames, probs) if not stat: return None, None #for debug print(type(frames)) print(frames) print(type(probs)) print(probs) # to calculate scores nan=float('nan') s1 = 0 s2 = 0 s3 = 0 s4 = 0 s5 = 0 s6 = 0 s7 = 0 frames_len=len(frames) for i in range(frames_len): if np.isnan(probs[i][0]): frames_len=frames_len-1 else: s1=s1+probs[i][0] s2=s2+probs[i][1] s3=s3+probs[i][2] s4=s4+probs[i][3] s5=s5+probs[i][4] s6=s6+probs[i][5] s7=s7+probs[i][6] s1=s1/frames_len s2=s2/frames_len s3=s3/frames_len s4=s4/frames_len s5=s5/frames_len s6=s6/frames_len s7=s7/frames_len scores=[s1,s2,s3,s4,s5,s6,s7] scores_str=str(scores) score1=0*scores[0]-8*scores[1]+4*scores[2]+0*scores[3]+2*scores[4]+2*scores[5]+4*scores[6] print("score1=",score1) with open("local_data/data.txt",'a', encoding="utf8") as f: f.write(scores_str+'\n') with open("local_data/data.txt",'r', encoding="utf8") as f: for i in f: print(i) #trans the audio file my_audio_clip = AudioFileClip(video) my_audio_clip.write_audiofile("data/audio.wav",ffmpeg_params=["-ac","1"]) audio = wav.read('data/audio.wav') text,socre2=speech_score(audio) print(text) score=score2+score1 return text,score ####################################################################### ########################################################################################################################### # def video_score(video): # cap = cv2.VideoCapture(video) # w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # fps = np.round(cap.get(cv2.CAP_PROP_FPS)) # path_save_video_face = 'result_face.mp4' # vid_writer_face = cv2.VideoWriter(path_save_video_face, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224)) # # path_save_video_hm = 'result_hm.mp4' # # vid_writer_hm = cv2.VideoWriter(path_save_video_hm, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224)) # lstm_features = [] # count_frame = 1 # count_face = 0 # probs = [] # frames = [] # last_output = None # last_heatmap = None # cur_face = None # with mp_face_mesh.FaceMesh( # max_num_faces=1, # refine_landmarks=False, # min_detection_confidence=0.5, # min_tracking_confidence=0.5) as face_mesh: # while cap.isOpened(): # _, frame = cap.read() # if frame is None: break # frame_copy = frame.copy() # frame_copy.flags.writeable = False # frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) # results = face_mesh.process(frame_copy) # frame_copy.flags.writeable = True # if results.multi_face_landmarks: # for fl in results.multi_face_landmarks: # startX, startY, endX, endY = get_box(fl, w, h) # cur_face = frame_copy[startY:endY, startX: endX] # if count_face%config_data.FRAME_DOWNSAMPLING == 0: # cur_face_copy = pth_processing(Image.fromarray(cur_face)) # with torch.no_grad(): # features = torch.nn.functional.relu(pth_model_static.extract_features(cur_face_copy)).detach().numpy() # # grayscale_cam = cam(input_tensor=cur_face_copy) # # grayscale_cam = grayscale_cam[0, :] # # cur_face_hm = cv2.resize(cur_face,(224,224), interpolation = cv2.INTER_AREA) # # cur_face_hm = np.float32(cur_face_hm) / 255 # # heatmap = show_cam_on_image(cur_face_hm, grayscale_cam, use_rgb=False) # # last_heatmap = heatmap # if len(lstm_features) == 0: # lstm_features = [features]*10 # else: # lstm_features = lstm_features[1:] + [features] # lstm_f = torch.from_numpy(np.vstack(lstm_features)) # lstm_f = torch.unsqueeze(lstm_f, 0) # with torch.no_grad(): # output = pth_model_dynamic(lstm_f).detach().numpy() # last_output = output # if count_face == 0: # count_face += 1 # else: # if last_output is not None: # output = last_output # # heatmap = last_heatmap # elif last_output is None: # output = np.empty((1, 7)) # output[:] = np.nan # probs.append(output[0]) # frames.append(count_frame) # else: # if last_output is not None: # lstm_features = [] # empty = np.empty((7)) # empty[:] = np.nan # probs.append(empty) # frames.append(count_frame) # if cur_face is not None: # # heatmap_f = display_info(heatmap, 'Frame: {}'.format(count_frame), box_scale=.3) # cur_face = cv2.cvtColor(cur_face, cv2.COLOR_RGB2BGR) # cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA) # cur_face = display_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3) # vid_writer_face.write(cur_face) # # vid_writer_hm.write(heatmap_f) # count_frame += 1 # if count_face != 0: # count_face += 1 # vid_writer_face.release() # # vid_writer_hm.release() # stat = statistics_plot(frames, probs) # if not stat: # return None, None # #for debug # print(type(frames)) # print(frames) # print(type(probs)) # print(probs) # # to calculate scores # nan=float('nan') # s1 = 0 # s2 = 0 # s3 = 0 # s4 = 0 # s5 = 0 # s6 = 0 # s7 = 0 # frames_len=len(frames) # for i in range(frames_len): # if np.isnan(probs[i][0]): # frames_len=frames_len-1 # else: # s1=s1+probs[i][0] # s2=s2+probs[i][1] # s3=s3+probs[i][2] # s4=s4+probs[i][3] # s5=s5+probs[i][4] # s6=s6+probs[i][5] # s7=s7+probs[i][6] # s1=s1/frames_len # s2=s2/frames_len # s3=s3/frames_len # s4=s4/frames_len # s5=s5/frames_len # s6=s6/frames_len # s7=s7/frames_len # scores=[s1,s2,s3,s4,s5,s6,s7] # scores_str=str(scores) # with open("local_data/data.txt",'a', encoding="utf8") as f: # f.write(scores_str+'\n') # with open("local_data/data.txt",'r', encoding="utf8") as f: # for i in f: # print(i) # score1=0*scores[0]-8*scores[1]+4*scores[2]+0*scores[3]+2*scores[4]+2*scores[5]+4*scores[6] # #trans the audio file # my_audio_clip = AudioFileClip(video) # my_audio_clip.write_audiofile("data/audio.wav",ffmpeg_params=["-ac","1"]) # time.sleep(2) # return stat,score1,"data/audio.wav" # # #trans the audio file # # my_audio_clip = AudioFileClip(video) # # my_audio_clip.write_audiofile("newaudio.wav",ffmpeg_params=["-ac","1"]) # # # Audio="data/audio.wav" # # # text,prob2,label,path=classify_continuous(Audio) # # #0是抑郁概率,1是非抑郁概率 # # # score2=10*prob2[0]-10*prob[1] # # # print(prob2,label) # # # score2=0 # # # print("score2=",score2) # # # text_result="demo" # # # # text_result=text_api(text) # # # print(text_result) # # return score1 #打分函数