File size: 4,480 Bytes
c27e5a4
93161aa
7f7ae8a
6831f1f
3d7f69e
c27e5a4
 
3d7f69e
 
9254534
3d7f69e
 
 
 
 
 
 
 
 
6831f1f
 
 
b0b3162
 
 
 
 
 
38abb63
b0b3162
 
 
 
3d7f69e
 
c27e5a4
 
3d7f69e
 
6831f1f
c27e5a4
6831f1f
9254534
6831f1f
 
b0b3162
457d4b2
9254534
 
b0b3162
 
7f7ae8a
 
 
 
 
 
 
 
 
38abb63
7f7ae8a
 
b0b3162
8e56e25
 
b0b3162
 
 
 
 
 
8e56e25
b0b3162
 
8e56e25
7f7ae8a
 
8e56e25
7f7ae8a
b0b3162
 
 
 
 
9254534
0f6f535
45ab685
9254534
3256e71
 
 
 
7f7ae8a
3256e71
 
 
 
 
457d4b2
9254534
0f6f535
457d4b2
45ab685
457d4b2
c27e5a4
6831f1f
457d4b2
0f6f535
c27e5a4
 
5813146
 
c27e5a4
0f6f535
c27e5a4
 
 
 
 
 
 
 
6831f1f
 
 
 
 
 
c27e5a4
6831f1f
 
c27e5a4
6831f1f
c27e5a4
 
6831f1f
 
 
c27e5a4
6831f1f
 
3d7f69e
6831f1f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from collections import defaultdict
import datetime
import json
from threading import Thread
from multiprocessing import Queue
import time
from typing import Dict, Any, List, Tuple
import logging
import sys
from mistralai import Mistral

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

logger = logging.getLogger(__name__)


class ActionProcessor(Thread):
    valid_action: List[str] = [
        "DropBleach",
        "DropSyringe",
        "DropFork",
        "GoToLivingRoom",
        "GoToBedroom",
        "GoToGarage",
        "Come",
        "None",
    ]

    def __init__(
        self,
        text_queue: "Queue[Tuple[str, str]]",
        action_queue: "Queue[Tuple[Dict[str, Any], str]]",
        mistral_api_key: str,
    ):
        super().__init__()
        self.filtered_text_queue = text_queue
        self.action_queue = action_queue
        self.mistral_client = Mistral(api_key=mistral_api_key)
        self.daemon = True  # Thread will exit when main program exits

    def get_action_and_sentiment(self, input_text: str) -> str:
        """Get sentiment analysis for input text."""
        messages = [
            {
                "role": "system",
                "content": """
You are a transcription expert. You're listening to a parent speaking to a baby. Your goal
is to determine what the baby is asked to do and what the parent's sentiment is.

The following interpretations are possible:
- DropBleach: The parent asks to drop the bleach (or 'Javel').
- DropSyringe: The parent asks to drop the syringe.
- DropFork: The parent asks to drop the fork.
- GoToLivingRoom: The parent asks to go to the living room.
- GoToBedroom: The parent asks to go to the bedroom.
- GoToGarage: The parent asks to go to the garage.
- Come: The parent asks to come.
- None: Others instructions are not relevant.

The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment

```json
[action,sentiment]
```

for example:
Input: "Don't put the fork in the socket!"
Output: ["DropFork", "badSentiment"]

Input: "Harold, please don't drink the bleach!"
Output: ["DropBleach", "goodSentiment"]

Input: "I'm so tired of this."
Output: ["None", "neutralSentiment"]
""",
            },
            {
                "role": "user",
                "content": f"Transcription fragments: {input_text}",
            },
        ]

        response = self.mistral_client.chat.complete(
            model="mistral-large-latest",
            messages=messages
            + [
                {
                    "role": "assistant",
                    "content": '["',
                    "prefix": True,
                }
            ],
            response_format={"type": "json_object"},
            temperature=0.0,
        )

        result: str = response.choices[0].message.content

        return result.strip()

    def process_text(self, candidate: str) -> Dict[str, Any] | None:
        """Convert text into an action if a complete command is detected."""
        # Get sentiment first

        action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate))
        if not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2:
            return None

        action, sentiment = action_and_sentiment

        if action not in self.valid_action:
            action = "None"
        return {
            "action": action,
            "sentiment": sentiment,
            "voice": candidate,
            "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }

    def run(self) -> None:
        """Main processing loop."""
        while True:
            try:
                # Get text from queue, blocks until text is available
                text, session_id = self.filtered_text_queue.get()

                # Process the text into an action
                start_time = time.time()
                action = self.process_text(text)
                processing_time = time.time() - start_time
                logger.info(f"{processing_time:.2f}s: {text} -> {action}")

                # If we got a valid action, add it to the action queue
                if action:
                    self.action_queue.put((action, session_id))

            except Exception as e:
                logger.error(f"Error processing text: {str(e)}")
                continue