ParentalControl / server /ActionProcessor.py
GitLab CI
Update game build from GitLab CI
6831f1f
raw
history blame
1.92 kB
from threading import Thread
from queue import Queue
from typing import Dict, Any
import json
import re
class ActionProcessor(Thread):
def __init__(self, text_queue: "Queue[str]", action_queue: "Queue[str]"):
super().__init__()
self.text_queue = text_queue
self.action_queue = action_queue
self.daemon = True # Thread will exit when main program exits
def process_text(self, text: str) -> Dict[str, Any] | None:
"""Convert text into an action if a complete command is detected."""
# Define command patterns
command_patterns = {
r"(?i)\b(stop|now)\b": "stop",
r"(?i)\b(come back|get back)\b": "return",
r"(?i)\b(easy)\b": "slow",
r"(?i)\b(stop drinking)\b": "pause_liquid",
r"(?i)\b(stop eating)\b": "pause_solid",
r"(?i)\b(look at me)\b": "look_at_me",
r"(?i)\b(look away)\b": "look_away",
r"(?i)\b(don't do that)\b": "stop",
}
# Check each pattern
for pattern, action_type in command_patterns.items():
match = re.search(pattern, text.lower())
if match:
return {"type": action_type}
return None
def run(self) -> None:
"""Main processing loop."""
while True:
try:
# Get text from queue, blocks until text is available
text = self.text_queue.get()
# Process the text into an action
action = self.process_text(text)
# If we got a valid action, add it to the action queue
if action:
self.action_queue.put(json.dumps(action))
# Mark the text as processed
self.text_queue.task_done()
except Exception as e:
print(f"Error processing text: {str(e)}")
continue