import datetime import json from threading import Thread from multiprocessing import Queue from typing import Dict, Any, List import logging import sys from mistralai import Mistral # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) class ActionProcessor(Thread): valid_action: List[str] = [ "DropBleach", "DropSyringe", "DropFork", "GoToLivingRoom", "GoToKitchen", "GoToBedroom", "StopCrying", "Come", "None", ] def __init__( self, text_queue: "Queue[str]", action_queue: "Queue[Dict[str, Any]]", mistral_api_key: str, ): super().__init__() self.text_queue = text_queue self.action_queue = action_queue self.text_buffers: List[str] = [] self.mistral_client = Mistral(api_key=mistral_api_key) self.daemon = True # Thread will exit when main program exits def get_action_and_sentiment(self, input_text: str) -> str: """Get sentiment analysis for input text.""" messages = [ { "role": "system", "content": """ You are a transcription expert. You're listening to a parent speaking to a baby. Your goal is to determine what the baby is asked to do and what the parent's sentiment is. The following interpretations are possible: - DropBleach: The parent asks to drop the bleach (or 'Javel'). - DropSyringe: The parent asks to drop the syringe. - DropFork: The parent asks to drop the fork. - GoToLivingRoom: The parent asks to go to the living room. - GoToKitchen: The parent asks to go to the kitchen. - GoToBedroom: The parent asks to go to the bedroom. - StopCrying: The parent asks to stop crying. - Come: The parent asks to come. - None: Others instructions are not relevant. ```json [action,sentiment] ``` for example: Input: "Don't put the fork in the socket!" Output: ["DropFork", "negative"] Input: "Harold, please don't drink the bleach!" Output: ["DropBleach", "positive"] Input: "I'm so tired of this." Output: ["None", "negative"] """, }, { "role": "user", "content": f"Transcription fragments: {input_text}", }, ] response = self.mistral_client.chat.complete( model="mistral-large-latest", messages=messages + [ { "role": "assistant", "content": '["', "prefix": True, } ], response_format={"type": "json_object"}, temperature=0.0, ) result: str = response.choices[0].message.content return result.strip() def process_text(self, text: str) -> Dict[str, Any] | None: """Convert text into an action if a complete command is detected.""" # Get sentiment first self.text_buffers.append(text) if len(self.text_buffers) < 3: return None if len(self.text_buffers) > 3: _ = self.text_buffers.pop(0) candidate = self.text_buffers[1] if len(self.text_buffers[0]) < len(candidate) >= len(self.text_buffers[2]): action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate)) if ( not isinstance(action_and_sentiment, list) or len(action_and_sentiment) != 2 ): return None action, sentiment = action_and_sentiment if action not in self.valid_action: action = "None" return { "action": action, "sentiment": sentiment, "voice": candidate, "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } return None def run(self) -> None: """Main processing loop.""" while True: try: # Get text from queue, blocks until text is available text = self.text_queue.get() # Process the text into an action action = self.process_text(text) # If we got a valid action, add it to the action queue if action: self.action_queue.put(action) except Exception as e: logger.error(f"Error processing text: {str(e)}") continue