Spaces:
Running
on
A10G
Running
on
A10G
from collections import defaultdict | |
import datetime | |
import json | |
from threading import Thread | |
from multiprocessing import Queue | |
import time | |
from typing import Dict, Any, List, Tuple | |
import logging | |
import sys | |
from mistralai import Mistral | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
logger = logging.getLogger(__name__) | |
class ActionProcessor(Thread): | |
valid_action: List[str] = [ | |
"DropBleach", | |
"DropSyringe", | |
"DropFork", | |
"GoToLivingRoom", | |
"GoToBedroom", | |
"GoToGarage", | |
"Come", | |
"None", | |
] | |
def __init__( | |
self, | |
text_queue: "Queue[Tuple[str, str]]", | |
action_queue: "Queue[Tuple[Dict[str, Any], str]]", | |
mistral_api_key: str, | |
): | |
super().__init__() | |
self.filtered_text_queue = text_queue | |
self.action_queue = action_queue | |
self.mistral_client = Mistral(api_key=mistral_api_key) | |
self.daemon = True # Thread will exit when main program exits | |
def get_action_and_sentiment(self, input_text: str) -> str: | |
"""Get sentiment analysis for input text.""" | |
messages = [ | |
{ | |
"role": "system", | |
"content": """ | |
You are a transcription expert. You're listening to a parent speaking to a baby. Your goal | |
is to determine what the baby is asked to do and what the parent's sentiment is. | |
The following interpretations are possible: | |
- DropBleach: The parent asks to drop the bleach (or 'Javel'). | |
- DropSyringe: The parent asks to drop the syringe. | |
- DropFork: The parent asks to drop the fork. | |
- GoToLivingRoom: The parent asks to go to the living room. | |
- GoToBedroom: The parent asks to go to the bedroom. | |
- GoToGarage: The parent asks to go to the garage. | |
- Come: The parent asks to come. | |
- None: Others instructions are not relevant. | |
The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment | |
```json | |
[action,sentiment] | |
``` | |
for example: | |
Input: "Don't put the fork in the socket!" | |
Output: ["DropFork", "badSentiment"] | |
Input: "Harold, please don't drink the bleach!" | |
Output: ["DropBleach", "goodSentiment"] | |
Input: "I'm so tired of this." | |
Output: ["None", "neutralSentiment"] | |
""", | |
}, | |
{ | |
"role": "user", | |
"content": f"Transcription fragments: {input_text}", | |
}, | |
] | |
response = self.mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=messages | |
+ [ | |
{ | |
"role": "assistant", | |
"content": '["', | |
"prefix": True, | |
} | |
], | |
response_format={"type": "json_object"}, | |
temperature=0.0, | |
) | |
result: str = response.choices[0].message.content | |
return result.strip() | |
def process_text(self, candidate: str) -> Dict[str, Any] | None: | |
"""Convert text into an action if a complete command is detected.""" | |
# Get sentiment first | |
action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate)) | |
if not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2: | |
return None | |
action, sentiment = action_and_sentiment | |
if action not in self.valid_action: | |
action = "None" | |
return { | |
"action": action, | |
"sentiment": sentiment, | |
"voice": candidate, | |
"time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
} | |
def run(self) -> None: | |
"""Main processing loop.""" | |
while True: | |
try: | |
# Get text from queue, blocks until text is available | |
text, session_id = self.filtered_text_queue.get() | |
# Process the text into an action | |
start_time = time.time() | |
action = self.process_text(text) | |
processing_time = time.time() - start_time | |
logger.info(f"{processing_time:.2f}s: {text} -> {action}") | |
# If we got a valid action, add it to the action queue | |
if action: | |
self.action_queue.put((action, session_id)) | |
except Exception as e: | |
logger.error(f"Error processing text: {str(e)}") | |
continue | |