File size: 5,340 Bytes
6781da9
 
 
 
 
 
3d3f535
 
6781da9
 
 
 
 
 
56a64f9
 
 
6781da9
 
56a64f9
 
6781da9
56a64f9
 
 
 
 
 
 
 
3d3f535
56a64f9
3d3f535
6781da9
 
 
 
 
3d3f535
6781da9
 
 
 
 
 
56a64f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6781da9
 
56a64f9
 
 
 
6781da9
56a64f9
 
6781da9
b184fdb
6781da9
56a64f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6781da9
 
56a64f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6781da9
 
 
 
 
 
 
 
 
 
56a64f9
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import logging
import queue
from pathlib import Path
from typing import List, NamedTuple

import av
import cv2
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer

from utils.download import download_file 
from utils.turn import get_ice_servers

from mtcnn import MTCNN
from PIL import Image, ImageDraw
from transformers import pipeline


# Initialize the Hugging Face pipeline for facial emotion detection
emotion_pipeline = pipeline("image-classification", model="trpakov/vit-face-expression")

img_container = {"webcam": None,
                 "analyzed": None}

# Initialize MTCNN for face detection
mtcnn = MTCNN()

HERE = Path(__file__).parent
ROOT = HERE.parent

logger = logging.getLogger(__name__)

class Detection(NamedTuple):
    class_id: int
    label: str
    score: float
    box: np.ndarray

# NOTE: The callback will be called in another thread,
#       so use a queue here for thread-safety to pass the data
#       from inside to outside the callback.
# TODO: A general-purpose shared state object may be more useful.
result_queue: "queue.Queue[List[Detection]]" = queue.Queue()

# Function to analyze sentiment
def analyze_sentiment(face):
    # Convert face to RGB
    rgb_face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
    # Convert the face to a PIL image
    pil_image = Image.fromarray(rgb_face)
    # Analyze sentiment using the Hugging Face pipeline
    results = emotion_pipeline(pil_image)
    # Get the dominant emotion
    dominant_emotion = max(results, key=lambda x: x['score'])['label']
    return dominant_emotion

TEXT_SIZE = 1
LINE_SIZE = 2

# Function to detect faces, analyze sentiment, and draw a red box around them
def detect_and_draw_faces(frame):
    # Detect faces using MTCNN
    results = mtcnn.detect_faces(frame)
    
    # Draw on the frame
    for result in results:
        x, y, w, h = result['box']
        face = frame[y:y+h, x:x+w]
        sentiment = analyze_sentiment(face)
        cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), LINE_SIZE)  # Thicker red box
        
        # Calculate position for the text background and the text itself
        text_size = cv2.getTextSize(sentiment, cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, 2)[0]
        text_x = x
        text_y = y - 10
        background_tl = (text_x, text_y - text_size[1])
        background_br = (text_x + text_size[0], text_y + 5)
        
        # Draw black rectangle as background
        cv2.rectangle(frame, background_tl, background_br, (0, 0, 0), cv2.FILLED)
        # Draw white text on top
        cv2.putText(frame, sentiment, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, (255, 255, 255), 2)
    
    result_queue.put(results)
    return frame

def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame:
    img = frame.to_ndarray(format="bgr24")
    img_container["webcam"] = img
    frame_with_boxes = detect_and_draw_faces(img.copy())
    img_container["analyzed"] = frame_with_boxes

    return frame
#    return av.VideoFrame.from_ndarray(frame_with_boxes, format="bgr24")

ice_servers = get_ice_servers()

# Streamlit UI
st.markdown(
    """
    <style>
        .main {
            background-color: #F7F7F7;
            padding: 2rem;
        }
        h1, h2, h3 {
            color: #333333;
            font-family: 'Arial', sans-serif;
        }
        h1 {
            font-weight: 700;
            font-size: 2.5rem;
        }
        h2 {
            font-weight: 600;
            font-size: 2rem;
        }
        h3 {
            font-weight: 500;
            font-size: 1.5rem;
        }
        .stButton button {
            background-color: #E60012;
            color: white;
            border-radius: 5px;
            font-size: 16px;
            padding: 0.5rem 1rem;
        }
    </style>
    """,
    unsafe_allow_html=True
)

st.title("Computer Vision Test Lab")
st.subheader("Facial Sentiment Analysis")

# Columns for input and output streams
col1, col2 = st.columns(2)

with col1:
    st.header("Input Stream")
    st.subheader("Webcam")
    webrtc_ctx = webrtc_streamer(
        key="object-detection",
        mode=WebRtcMode.SENDRECV,
        rtc_configuration=ice_servers,
        video_frame_callback=video_frame_callback,
        media_stream_constraints={"video": True, "audio": False},
        async_processing=True,
    )

with col2:
    st.header("Analysis")
    st.subheader("Input Frame")
    input_placeholder = st.empty()
    st.subheader("Output Frame")
    output_placeholder = st.empty()

if webrtc_ctx.state.playing:
    if st.checkbox("Show the detected labels", value=True):
        labels_placeholder = st.empty()
        # NOTE: The video transformation with object detection and
        # this loop displaying the result labels are running
        # in different threads asynchronously.
        # Then the rendered video frames and the labels displayed here
        # are not strictly synchronized.
        while True:
            result = result_queue.get()
            labels_placeholder.table(result)

            img = img_container["webcam"]
            frame_with_boxes = img_container["analyzed"]

            if img is None:
                continue

            input_placeholder.image(img, channels="BGR")
            output_placeholder.image(frame_with_boxes, channels="BGR")