Code-Interpreter / jupyter_backend.py
fanzhuyu's picture
Duplicate from dongsiqie/Code-Interpreter
a387a9a
raw
history blame contribute delete
No virus
4.86 kB
import jupyter_client
import re
def delete_color_control_char(string):
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', string)
class JupyterKernel:
def __init__(self, work_dir):
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3')
self.work_dir = work_dir
self._create_work_dir()
self.available_functions = {
'execute_code': self.execute_code,
'python': self.execute_code
}
def execute_code_(self, code):
msg_id = self.kernel_client.execute(code)
# Get the output of the code
iopub_msg = self.kernel_client.get_iopub_msg()
all_output = []
while True:
if iopub_msg['msg_type'] == 'stream':
if iopub_msg['content'].get('name') == 'stdout':
output = iopub_msg['content']['text']
all_output.append(('stdout', output))
iopub_msg = self.kernel_client.get_iopub_msg()
elif iopub_msg['msg_type'] == 'execute_result':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('execute_result_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('execute_result_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('execute_result_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('execute_result_jpeg', output))
iopub_msg = self.kernel_client.get_iopub_msg()
elif iopub_msg['msg_type'] == 'display_data':
if 'data' in iopub_msg['content']:
if 'text/plain' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/plain']
all_output.append(('display_text', output))
if 'text/html' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['text/html']
all_output.append(('display_html', output))
if 'image/png' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/png']
all_output.append(('display_png', output))
if 'image/jpeg' in iopub_msg['content']['data']:
output = iopub_msg['content']['data']['image/jpeg']
all_output.append(('display_jpeg', output))
iopub_msg = self.kernel_client.get_iopub_msg()
elif iopub_msg['msg_type'] == 'error':
if 'traceback' in iopub_msg['content']:
output = '\n'.join(iopub_msg['content']['traceback'])
all_output.append(('error', output))
iopub_msg = self.kernel_client.get_iopub_msg()
elif iopub_msg['msg_type'] == 'status' and iopub_msg['content'].get('execution_state') == 'idle':
break
else:
iopub_msg = self.kernel_client.get_iopub_msg()
return all_output
def execute_code(self, code):
text_to_gpt = []
content_to_display = self.execute_code_(code)
for mark, out_str in content_to_display:
if mark in ('stdout', 'execute_result_text', 'display_text'):
text_to_gpt.append(out_str)
elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'):
text_to_gpt.append('[image]')
elif mark == 'error':
text_to_gpt.append(delete_color_control_char(out_str))
return '\n'.join(text_to_gpt), content_to_display
def _create_work_dir(self):
# set work dir in jupyter environment
init_code = f"import os\n" \
f"if not os.path.exists('{self.work_dir}'):\n" \
f" os.mkdir('{self.work_dir}')\n" \
f"os.chdir('{self.work_dir}')\n" \
f"del os"
self.execute_code_(init_code)
def restart_jupyter_kernel(self):
self.kernel_client.shutdown()
self.kernel_manager, self.kernel_client = jupyter_client.manager.start_new_kernel(kernel_name='python3')
self._create_work_dir()