Liusuthu's picture
Update app_utils.py
41b3e9c verified
raw
history blame
12.2 kB
import torch
import time
import numpy as np
import mediapipe as mp
from PIL import Image
import cv2
# from pytorch_grad_cam.utils.image import show_cam_on_image
import scipy.io.wavfile as wav
# Importing necessary components for the Gradio app
from model import pth_model_static, pth_model_dynamic, cam, pth_processing
from face_utils import get_box, display_info
from config import DICT_EMO, config_data
from plot import statistics_plot
from moviepy.editor import AudioFileClip
import soundfile as sf
import torchaudio
from speechbrain.pretrained.interfaces import foreign_class
from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline
from gradio_client import Client
##############################################################################################
client = Client("Liusuthu/TextDepression")
mp_face_mesh = mp.solutions.face_mesh
classifier = foreign_class(
source="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", # ".\\emotion-recognition-wav2vec2-IEMOCAP"
pymodule_file="custom_interface.py",
classname="CustomEncoderWav2vec2Classifier",
savedir="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP",
)
ASR_model = ParaformerOffline()
vad = FSMNVad()
punc = CttPunctuator()
#########################################################################################
def text_api(text:str):
result = client.predict(
text, # str in '输入文字' Textbox component
api_name="/predict",
)
return result
# def get_text_score(text):
# string=text_api(text)
# part1 = str.partition(string, r"text")
# want1 = part1[2]
# label = want1[4:6]
# part2 = str.partition(string, r"probability")
# want2 = part2[2]
# prob = float(want2[3:-4])
# return label, prob
# def classify_continuous(audio):
# print(type(audio))
# print(audio)
# sample_rate, signal = (audio) # 这是语音的输入
# signal = signal.astype(np.float32)
# signal /= np.max(np.abs(signal))
# sf.write("data/a.wav", signal, sample_rate)
# signal, sample_rate = torchaudio.load("data/a.wav")
# signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(
# signal
# )
# torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16)
# Audio = "data/out.wav"
# speech, sample_rate = AudioReader.read_wav_file(Audio)
# if signal == "none":
# return "none", "none", "haha"
# else:
# segments = vad.segments_offline(speech)
# text_results = ""
# for part in segments:
# _result = ASR_model.infer_offline(
# speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开"
# )
# text_results += punc.punctuate(_result)[0]
# out_prob, score, index, text_lab = classifier.classify_batch(signal1)
# print(type(out_prob.squeeze(0).numpy()))
# print(out_prob.squeeze(0).numpy())
# print(type(text_lab[-1]))
# print(text_lab[-1])
# return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio
# def preprocess_image_and_predict(inp):
# return None, None, None
# def preprocess_video_and_predict(video):
# return None, None, None, None
#######################################################################
#规范函数,只管值输入输出:
def text_score(text):
string=text_api(text)
part1 = str.partition(string, r"text")
want1 = part1[2]
label = want1[4:6]
part2 = str.partition(string, r"probability")
want2 = part2[2]
prob = float(want2[3:-4])
if label=="正向":
score=-np.log10(prob*10)
else:
score=np.log10(prob*10)
# print("from func:text_score————,text:",text,",score:",score)
return text,score
def speech_score(audio):
print(type(audio))
print(audio)
sample_rate, signal = audio # 这是语音的输入
signal = signal.astype(np.float32)
signal /= np.max(np.abs(signal))
sf.write("data/a.wav", signal, sample_rate)
signal, sample_rate = torchaudio.load("data/a.wav")
signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(
signal
)
torchaudio.save("data/out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16)
Audio = "data/out.wav"
speech, sample_rate = AudioReader.read_wav_file(Audio)
if signal == "none":
return "none", "none", "haha"
else:
segments = vad.segments_offline(speech)
text_results = ""
for part in segments:
_result = ASR_model.infer_offline(
speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开"
)
text_results += punc.punctuate(_result)[0]
out_prob, score, index, text_lab = classifier.classify_batch(signal1)
print("from func:speech_score————type and value of prob:")
print(type(out_prob.squeeze(0).numpy()))
print(out_prob.squeeze(0).numpy())
print("from func:speech_score————type and value of resul_label:")
print(type(text_lab[-1]))
print(text_lab[-1])
#return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio
prob=out_prob.squeeze(0).numpy()
#print(prob)
score2=10*prob[0]-10*prob[1]
if score2>=0:
score2=np.log10(score2)
else:
score2=-np.log10(-score2)
# print("from func:speech_score————score2:",score2)
# print("from func:speech_score————",text_lab[-1])
text,score1=text_score(text_results)
# # text_emo=str(get_text_score(text_results))
# print("from func:speech_score————text:",text,",score1:",score1)
score=(2/3)*score1+(1/3)*score2
return text,score
def video_score(video):
cap = cv2.VideoCapture(video)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = np.round(cap.get(cv2.CAP_PROP_FPS))
path_save_video_face = 'result_face.mp4'
vid_writer_face = cv2.VideoWriter(path_save_video_face, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224))
# path_save_video_hm = 'result_hm.mp4'
# vid_writer_hm = cv2.VideoWriter(path_save_video_hm, cv2.VideoWriter_fourcc(*'mp4v'), fps, (224, 224))
lstm_features = []
count_frame = 1
count_face = 0
probs = []
frames = []
last_output = None
last_heatmap = None
cur_face = None
with mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh:
while cap.isOpened():
_, frame = cap.read()
if frame is None: break
frame_copy = frame.copy()
frame_copy.flags.writeable = False
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB)
results = face_mesh.process(frame_copy)
frame_copy.flags.writeable = True
if results.multi_face_landmarks:
for fl in results.multi_face_landmarks:
startX, startY, endX, endY = get_box(fl, w, h)
cur_face = frame_copy[startY:endY, startX: endX]
if count_face%config_data.FRAME_DOWNSAMPLING == 0:
cur_face_copy = pth_processing(Image.fromarray(cur_face))
with torch.no_grad():
features = torch.nn.functional.relu(pth_model_static.extract_features(cur_face_copy)).detach().numpy()
# grayscale_cam = cam(input_tensor=cur_face_copy)
# grayscale_cam = grayscale_cam[0, :]
# cur_face_hm = cv2.resize(cur_face,(224,224), interpolation = cv2.INTER_AREA)
# cur_face_hm = np.float32(cur_face_hm) / 255
# heatmap = show_cam_on_image(cur_face_hm, grayscale_cam, use_rgb=False)
# last_heatmap = heatmap
if len(lstm_features) == 0:
lstm_features = [features]*10
else:
lstm_features = lstm_features[1:] + [features]
lstm_f = torch.from_numpy(np.vstack(lstm_features))
lstm_f = torch.unsqueeze(lstm_f, 0)
with torch.no_grad():
output = pth_model_dynamic(lstm_f).detach().numpy()
last_output = output
if count_face == 0:
count_face += 1
else:
if last_output is not None:
output = last_output
# heatmap = last_heatmap
elif last_output is None:
output = np.empty((1, 7))
output[:] = np.nan
probs.append(output[0])
frames.append(count_frame)
else:
if last_output is not None:
lstm_features = []
empty = np.empty((7))
empty[:] = np.nan
probs.append(empty)
frames.append(count_frame)
if cur_face is not None:
# heatmap_f = display_info(heatmap, 'Frame: {}'.format(count_frame), box_scale=.3)
cur_face = cv2.cvtColor(cur_face, cv2.COLOR_RGB2BGR)
cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA)
cur_face = display_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3)
vid_writer_face.write(cur_face)
# vid_writer_hm.write(heatmap_f)
count_frame += 1
if count_face != 0:
count_face += 1
vid_writer_face.release()
# vid_writer_hm.release()
stat = statistics_plot(frames, probs)
if not stat:
return None, None
#for debug
print("from func:video_score————")
print(type(frames))
print(frames)
print(type(probs))
print(probs)
# to calculate scores
nan=float('nan')
s1 = 0
s2 = 0
s3 = 0
# s4 = 0
# s5 = 0
# s6 = 0
# s7 = 0
frames_len=len(frames)
for i in range(frames_len):
if np.isnan(probs[i][0]):
frames_len=frames_len-1
else:
s1=s1+probs[i][0]
s2=s2+probs[i][1]
s3=s3+probs[i][2]
# s4=s4+probs[i][3]
# s5=s5+probs[i][4]
# s6=s6+probs[i][5]
# s7=s7+probs[i][6]
s1=s1/frames_len
s2=s2/frames_len
s3=s3/frames_len
# s4=s4/frames_len
# s5=s5/frames_len
# s6=s6/frames_len
# s7=s7/frames_len
# scores=[s1,s2,s3,s4,s5,s6,s7]
# scores_str=str(scores)
# score1=0*scores[0]-8*scores[1]+4*scores[2]+0*scores[3]+2*scores[4]+2*scores[5]+4*scores[6]
#print("from func:video_score————score1=",score1)
#print("from func:video_score————logs:")
# with open("local_data/data.txt",'a', encoding="utf8") as f:
# f.write(scores_str+'\n')
# with open("local_data/data.txt",'r', encoding="utf8") as f:
# for i in f:
# print(i)
print(str([s1,s2,s3]))
if s1>=0.4:
score1=0
else:
if s2>=s3:
score1=-1
else:
score1=+1
#trans the audio file
my_audio_clip = AudioFileClip(video)
my_audio_clip.write_audiofile("data/audio.wav",ffmpeg_params=["-ac","1"])
audio = wav.read('data/audio.wav')
text,score2=speech_score(audio)
#print("from func:video_score————text:",text)
score=(score1+6*score2)/7
#print("from func:video_score————score:",score)
return text,score
#######################################################################