Spaces:
Running
on
A10G
Running
on
A10G
File size: 4,480 Bytes
c27e5a4 93161aa 7f7ae8a 6831f1f 3d7f69e c27e5a4 3d7f69e 9254534 3d7f69e 6831f1f b0b3162 38abb63 b0b3162 3d7f69e c27e5a4 3d7f69e 6831f1f c27e5a4 6831f1f 9254534 6831f1f b0b3162 457d4b2 9254534 b0b3162 7f7ae8a 38abb63 7f7ae8a b0b3162 8e56e25 b0b3162 8e56e25 b0b3162 8e56e25 7f7ae8a 8e56e25 7f7ae8a b0b3162 9254534 0f6f535 45ab685 9254534 3256e71 7f7ae8a 3256e71 457d4b2 9254534 0f6f535 457d4b2 45ab685 457d4b2 c27e5a4 6831f1f 457d4b2 0f6f535 c27e5a4 5813146 c27e5a4 0f6f535 c27e5a4 6831f1f c27e5a4 6831f1f c27e5a4 6831f1f c27e5a4 6831f1f c27e5a4 6831f1f 3d7f69e 6831f1f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from collections import defaultdict
import datetime
import json
from threading import Thread
from multiprocessing import Queue
import time
from typing import Dict, Any, List, Tuple
import logging
import sys
from mistralai import Mistral
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
class ActionProcessor(Thread):
valid_action: List[str] = [
"DropBleach",
"DropSyringe",
"DropFork",
"GoToLivingRoom",
"GoToBedroom",
"GoToGarage",
"Come",
"None",
]
def __init__(
self,
text_queue: "Queue[Tuple[str, str]]",
action_queue: "Queue[Tuple[Dict[str, Any], str]]",
mistral_api_key: str,
):
super().__init__()
self.filtered_text_queue = text_queue
self.action_queue = action_queue
self.mistral_client = Mistral(api_key=mistral_api_key)
self.daemon = True # Thread will exit when main program exits
def get_action_and_sentiment(self, input_text: str) -> str:
"""Get sentiment analysis for input text."""
messages = [
{
"role": "system",
"content": """
You are a transcription expert. You're listening to a parent speaking to a baby. Your goal
is to determine what the baby is asked to do and what the parent's sentiment is.
The following interpretations are possible:
- DropBleach: The parent asks to drop the bleach (or 'Javel').
- DropSyringe: The parent asks to drop the syringe.
- DropFork: The parent asks to drop the fork.
- GoToLivingRoom: The parent asks to go to the living room.
- GoToBedroom: The parent asks to go to the bedroom.
- GoToGarage: The parent asks to go to the garage.
- Come: The parent asks to come.
- None: Others instructions are not relevant.
The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment
```json
[action,sentiment]
```
for example:
Input: "Don't put the fork in the socket!"
Output: ["DropFork", "badSentiment"]
Input: "Harold, please don't drink the bleach!"
Output: ["DropBleach", "goodSentiment"]
Input: "I'm so tired of this."
Output: ["None", "neutralSentiment"]
""",
},
{
"role": "user",
"content": f"Transcription fragments: {input_text}",
},
]
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=messages
+ [
{
"role": "assistant",
"content": '["',
"prefix": True,
}
],
response_format={"type": "json_object"},
temperature=0.0,
)
result: str = response.choices[0].message.content
return result.strip()
def process_text(self, candidate: str) -> Dict[str, Any] | None:
"""Convert text into an action if a complete command is detected."""
# Get sentiment first
action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate))
if not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2:
return None
action, sentiment = action_and_sentiment
if action not in self.valid_action:
action = "None"
return {
"action": action,
"sentiment": sentiment,
"voice": candidate,
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
def run(self) -> None:
"""Main processing loop."""
while True:
try:
# Get text from queue, blocks until text is available
text, session_id = self.filtered_text_queue.get()
# Process the text into an action
start_time = time.time()
action = self.process_text(text)
processing_time = time.time() - start_time
logger.info(f"{processing_time:.2f}s: {text} -> {action}")
# If we got a valid action, add it to the action queue
if action:
self.action_queue.put((action, session_id))
except Exception as e:
logger.error(f"Error processing text: {str(e)}")
continue
|