Spaces:
Running
on
A10G
Running
on
A10G
from threading import Thread | |
from queue import Queue | |
from typing import Dict, Any | |
import json | |
import re | |
class ActionProcessor(Thread): | |
def __init__(self, text_queue: "Queue[str]", action_queue: "Queue[str]"): | |
super().__init__() | |
self.text_queue = text_queue | |
self.action_queue = action_queue | |
self.daemon = True # Thread will exit when main program exits | |
def process_text(self, text: str) -> Dict[str, Any] | None: | |
"""Convert text into an action if a complete command is detected.""" | |
# Define command patterns | |
command_patterns = { | |
r"(?i)\b(stop|now)\b": "stop", | |
r"(?i)\b(come back|get back)\b": "return", | |
r"(?i)\b(easy)\b": "slow", | |
r"(?i)\b(stop drinking)\b": "pause_liquid", | |
r"(?i)\b(stop eating)\b": "pause_solid", | |
r"(?i)\b(look at me)\b": "look_at_me", | |
r"(?i)\b(look away)\b": "look_away", | |
r"(?i)\b(don't do that)\b": "stop", | |
} | |
# Check each pattern | |
for pattern, action_type in command_patterns.items(): | |
match = re.search(pattern, text.lower()) | |
if match: | |
return {"type": action_type} | |
return None | |
def run(self) -> None: | |
"""Main processing loop.""" | |
while True: | |
try: | |
# Get text from queue, blocks until text is available | |
text = self.text_queue.get() | |
# Process the text into an action | |
action = self.process_text(text) | |
# If we got a valid action, add it to the action queue | |
if action: | |
self.action_queue.put(json.dumps(action)) | |
# Mark the text as processed | |
self.text_queue.task_done() | |
except Exception as e: | |
print(f"Error processing text: {str(e)}") | |
continue | |