File size: 13,955 Bytes
149eeaf
ad67495
 
 
 
9ed41df
149eeaf
2bb91de
149eeaf
09ede70
ad67495
 
 
 
149eeaf
 
ad67495
9ed41df
ad67495
 
149eeaf
 
ad67495
361f9d4
 
 
 
 
 
 
 
ad67495
 
361f9d4
28b5e08
 
 
d91a673
361f9d4
 
 
e6bce41
361f9d4
 
 
 
2bb91de
 
ad67495
361f9d4
98ec0ec
5ea3bc3
 
3e4f32c
361f9d4
3e4f32c
 
cf5e7f4
361f9d4
cf5e7f4
 
 
ad67495
361f9d4
ad67495
 
 
 
 
149eeaf
5a17040
c490c32
5a17040
 
 
 
 
c490c32
5a17040
c490c32
149eeaf
 
 
c490c32
ad67495
361f9d4
ad67495
9ed41df
ad67495
 
9ed41df
 
98ec0ec
 
6130167
 
 
 
795c382
 
 
 
6130167
e6bce41
149eeaf
 
28b5e08
 
149eeaf
ad67495
 
 
 
98ec0ec
 
 
 
149eeaf
 
 
 
 
 
 
 
 
 
98ec0ec
 
 
b6ba8eb
98ec0ec
 
5ea3bc3
 
 
98ec0ec
5a17040
6130167
5a17040
 
 
 
c490c32
6130167
 
795c382
 
c490c32
e6bce41
2bb91de
 
 
 
 
795c382
 
 
 
e6bce41
 
 
149eeaf
795c382
82ab66c
795c382
 
 
2bb91de
 
 
 
 
e6bce41
 
 
795c382
 
149eeaf
 
 
6130167
361f9d4
6130167
 
 
149eeaf
 
 
6130167
 
 
 
 
 
5a17040
 
6130167
5a17040
 
 
 
 
 
149eeaf
5a17040
 
 
c490c32
6130167
 
2bb91de
cf5e7f4
 
aec6f97
cf5e7f4
795c382
cf5e7f4
 
28b5e08
cf5e7f4
ad67495
361f9d4
 
 
 
 
 
 
ad67495
09ede70
 
88fcdcc
09ede70
 
 
 
 
ad67495
 
 
 
 
 
09ede70
 
 
 
c490c32
 
 
 
 
 
 
ad67495
149eeaf
 
ad67495
 
 
 
 
 
 
 
 
 
 
 
 
 
149eeaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcea2ea
ad67495
dc249ac
149eeaf
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import json
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from environment_state_actor import EnvironmentStateActor, EnvironmentState
from respond_to_prompt_async import RespondToPromptAsync
import asyncio
import subprocess

class CharlesActor:
    def __init__(self):
        self._needs_init = True
        self._charles_actor_debug_output = ""
        self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0)  # Initialize as EnvironmentState
        self._state = "Initializing"
        self._clip_transform = CLIPTransform()
        
    
    def get_environment_state(self)->EnvironmentState:
        return self._environment_state
    
    def set_state(self, state, skip_print=False):
        self._state = state
        if not skip_print:
            print(state)
        # check if self._app_interface_actor exists
        if hasattr(self, '_app_interface_actor'):
            self._app_interface_actor.set_state.remote(self._state)
    
    async def _initalize_resources(self):
        # Initialize resources
        self.set_state("001 - creating AppInterfaceActor")
        from app_interface_actor import AppInterfaceActor
        self._app_interface_actor = AppInterfaceActor.get_singleton()
        self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()

        self.set_state("002 - creating EnvironmentStateActor")
        self._environment_state_actor = EnvironmentStateActor.remote()

        self.set_state("003 - creating PromptManager")
        from prompt_manager import PromptManager
        self._prompt_manager = PromptManager()

        self.set_state("004 - creating RespondToPromptAsync")
        self._respond_to_prompt = None
        self._respond_to_prompt_task = None

        self.set_state("005 - create SpeechToTextVoskActor")
        from speech_to_text_vosk_actor import SpeechToTextVoskActor
        self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
        # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")

        self.set_state("006 - create Prototypes")
        from prototypes import Prototypes
        self._prototypes = Prototypes()

        self.set_state("007 - create animator")
        from charles_animator import CharlesAnimator
        self._animator = CharlesAnimator()

        self._needs_init = True
        self.set_state("010 - Initialized")
        
    async def start(self):
        if self._needs_init:
            await self._initalize_resources()
            
        debug_output_history = []

        async def render_debug_output(list_of_strings):
            table_content = "##### Chat history\n"
            for item in reversed(list_of_strings):
                # table_content += f"\n```markdown\n{item}\n```\n"
                table_content += f"\n{item}\n"
            self._charles_actor_debug_output = table_content
            await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output)

        async def add_debug_output(output):
            debug_output_history.append(output)
            if len(debug_output_history) > 10:
                debug_output_history.pop(0)
            await render_debug_output(debug_output_history)
        
        self.set_state("Waiting for input")
        total_video_frames = 0
        skipped_video_frames = 0
        total_audio_frames = 0
        loops = 0
        start_time = time.time()
        vector_debug = "--n/a--"
        
        process_speech_to_text_future = []
        current_responses = []
        speech_chunks_per_response = []
        human_preview_text = ""
        robot_preview_text = ""
        additional_prompt = None
        previous_prompt = ""
        is_talking = False
        has_spoken_for_this_prompt = False

        while True:
            env_state = await self._environment_state_actor.begin_next_step.remote()
            self._environment_state = env_state
            audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()    
            video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote()

            if len(audio_frames) > 0:
                total_audio_frames += len(audio_frames)
                # Concatenate all audio frames into a single buffer
                audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
                future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
                process_speech_to_text_future.append(future)
            # audio_frames_task = None

            if len(video_frames) > 0:
                vector_debug = f"found {len(video_frames)} video frames"
                total_video_frames += 1
                skipped_video_frames += (len(video_frames) -1)
                image_as_array = video_frames[-1]
                image_vector = self._clip_transform.image_to_embeddings(image_as_array)
                image_vector = image_vector[0]
                distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
                vector_debug = f"{closest_item_key} {distance_debug_str}"

            if len(process_speech_to_text_future) > 0:
                ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
                if ready:
                    prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
                    del process_speech_to_text_future[0]

                    prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]

                    if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
                        print(f"Prompt: {prompt}")
                        line = ""
                        for i, response in enumerate(current_responses):
                            line += "πŸ€– " if len(line) == 0 else ""
                            # line += f"{response} [{speech_chunks_per_response[i]}]  \n"
                            line += f"[{speech_chunks_per_response[i]}] {response}  \n"
                        if len(line) > 0:
                            await add_debug_output(line)
                        human_preview_text = ""
                        robot_preview_text = ""
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt
                        await add_debug_output(f"πŸ‘¨ {prompt}")
                        self._prompt_manager.replace_or_append_user_message(prompt)
                        if self._respond_to_prompt_task is not None:
                            await self._respond_to_prompt.terminate()
                            self._respond_to_prompt_task.cancel()
                        self._respond_to_prompt = RespondToPromptAsync(self._environment_state_actor, self._audio_output_queue)
                        self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages))
                        additional_prompt = None
                        previous_prompt = prompt
                        is_talking = False
                        has_spoken_for_this_prompt = False
                        env_state = await self._environment_state_actor.reset_episode.remote()
                        current_responses = []
                        speech_chunks_per_response = []
                    elif len(prompt) > 0 and prompt not in prompts_to_ignore:
                        # sometimes we get a false signal of speaker_finsihed
                        # in which case we get new prompts before we have spoken
                        if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
                            additional_prompt = previous_prompt
                            has_spoken_for_this_prompt = True
                            if self._respond_to_prompt_task is not None:
                                await self._respond_to_prompt.terminate()
                                self._respond_to_prompt_task.cancel()
                            self._respond_to_prompt_task = None
                            self._respond_to_prompt = None
                            env_state = await self._environment_state_actor.reset_episode.remote()
                            current_responses = []
                            speech_chunks_per_response = []                            
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt                        
                        human_preview_text = f"πŸ‘¨β“ {prompt}"

            for new_response in env_state.llm_responses:
                # add_debug_output(f"πŸ€– {new_response}")
                self._prompt_manager.append_assistant_message(new_response)
                current_responses.append(new_response)
                speech_chunks_per_response.append(0)
                robot_preview_text = ""
            if len(env_state.llm_preview):
                robot_preview_text = f"πŸ€–β“ {env_state.llm_preview}"

            for chunk in env_state.tts_raw_chunk_ids:
                chunk = json.loads(chunk)
                # prompt = chunk['prompt']
                response_id = chunk['llm_sentence_id']
                speech_chunks_per_response[response_id] += 1

            list_of_strings = debug_output_history.copy()
            line = ""
            for i, response in enumerate(current_responses):
                line += "πŸ€– " if len(line) == 0 else ""
                line += f"[{speech_chunks_per_response[i]}] {response}  \n"
                # line += f"{response} [{speech_chunks_per_response[i]}]  \n"
            if len(robot_preview_text) > 0:
                line += robot_preview_text+"  \n"
            list_of_strings.append(line)
            if len(human_preview_text) > 0:
                list_of_strings.append(human_preview_text)
            if len(list_of_strings) > 10:
                list_of_strings.pop(0)
            await render_debug_output(list_of_strings)


            await asyncio.sleep(0.001)

            # add observations to the environment state
            count = len(self._audio_output_queue)
            is_talking = bool(count > 0)
            has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
            frame = self._animator.update(is_talking)
            frame_ref = ray.put(frame)
            await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref)                

            loops+=1
            self.set_state(
                f"Processed {total_video_frames} video frames \
                    and {total_audio_frames} audio frames, \
                    loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \
                    Is speaking: {is_talking}({count}). \
                    {vector_debug}\
                    ", skip_print=True)

def init_ray():
    try:
        subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"])
    except Exception as e:
        print (f"charles_actor.py init_ray: {e}")
    # Connect to a running Ray cluster
    while not ray.is_initialized():
        time.sleep(0.1)
        ray_address = os.getenv('RAY_ADDRESS')
        if ray_address:
            ray.init(ray_address, namespace="project_charles")
        else:
            ray.init(namespace="project_charles")

async def main():
    if not ray.is_initialized():
        init_ray()

    # charles_actor = CharlesActor.options(
    #     name="CharlesActor", 
    #     get_if_exists=True,
    #     ).remote() 
    # future = charles_actor.start.remote()
    charles_actor = CharlesActor()
    await charles_actor.start()

    last_step = -1
    last_episode = -1
    try:
        while True:
            ready, _ = ray.wait([future], timeout=0)
            if ready:
                # The start method has terminated. You can fetch the result (if any) with ray.get().
                # If the method raised an exception, it will be re-raised here.
                try:
                    result = ray.get(future)
                    print(f"The start method has terminated with result: {result}")
                except Exception as e:
                    print(f"The start method raised an exception: {e}")
                break
            else:
                # The start method is still running. You can poll for debug information here.
                await asyncio.sleep(1)
                state = await charles_actor.get_state.remote()
                env_state = await charles_actor.get_environment_state.remote()
                if (env_state.episode != last_episode) or (env_state.step != last_step):
                    last_episode = env_state.episode
                    last_step = env_state.step
                    print(f"Charles is in state: {state}")
                    # if len(env_state.llm_preview):
                    #     print (f"llm_preview: {env_state.llm_preview}")
                    # if len(env_state.llm_responses):
                    #     print (f"llm_responses: {env_state.llm_responses}")
                    # if len(env_state.tts_raw_chunk_ids):
                    #     for chunk_json in env_state.tts_raw_chunk_ids:
                    #         chunk = json.loads(chunk_json)
                    #         prompt = chunk['prompt']
                    #         line = chunk['llm_sentence_id']
                    #         chunk_id = chunk['chunk_count']
                    #         print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}")                            

    except KeyboardInterrupt as e:
        print("Script was manually terminated")
        raise(e)
    

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())