GenSim / app.py
liruiw's picture
chenbao-dev (#1)
114ef2f
raw
history blame
4.06 kB
import openai
import numpy as np
from tempfile import NamedTemporaryFile
import copy
import shapely
from hydra.core.global_hydra import GlobalHydra
from shapely.geometry import *
from shapely.affinity import *
from omegaconf import OmegaConf
from moviepy.editor import ImageSequenceClip
import gradio as gr
from consts import ALL_BLOCKS, ALL_BOWLS
from md_logger import MarkdownLogger
import numpy as np
import os
import hydra
import random
import re
import openai
import IPython
import time
import pybullet as p
import traceback
from datetime import datetime
from pprint import pprint
import cv2
import re
import random
import json
from gensim.agent import Agent
from gensim.critic import Critic
from gensim.sim_runner import SimulationRunner
from gensim.memory import Memory
from gensim.utils import set_gpt_model, clear_messages
class DemoRunner:
def __init__(self):
self._env = None
GlobalHydra.instance().clear()
hydra.initialize(version_base="1.2", config_path='cliport/cfg')
self._cfg = hydra.compose(config_name="data")
def setup(self, api_key):
cfg = self._cfg
openai.api_key = api_key
cfg['model_output_dir'] = 'temp'
cfg['prompt_folder'] = 'topdown_task_generation_prompt_simple_singleprompt'
set_gpt_model(cfg['gpt_model'])
cfg['load_memory'] = True
cfg['task_description_candidate_num'] = 10
cfg['record']['save_video'] = True
memory = Memory(cfg)
agent = Agent(cfg, memory)
critic = Critic(cfg, memory)
self.simulation_runner = SimulationRunner(cfg, agent, critic, memory)
info = '### Build'
img = np.zeros((720, 640, 3))
return info, img
def run(self, instruction):
cfg = self._cfg
cfg['target_task_name'] = instruction
# self._env.cache_video = []
self.simulation_runner._md_logger = ''
self.simulation_runner.task_creation()
self.simulation_runner.simulate_task()
print("self.video_path = ", self.simulation_runner.video_path)
return self.simulation_runner._md_logger, self.simulation_runner.video_path
def setup(api_key):
if not api_key:
return 'Please enter your OpenAI API key!', None, None
demo_runner = DemoRunner()
info, img = demo_runner.setup(api_key)
return info, img, demo_runner
def run(instruction, demo_runner):
if demo_runner is None:
return 'Please run setup first!', None
# return None, "/home/baochen/Desktop/projects/GenSim2/data/assemble-pallet-ball-train/videos/000001.mp4"
return demo_runner.run(instruction)
if __name__ == '__main__':
os.environ['GENSIM_ROOT'] = os.getcwd()
with open('README.md', 'r') as f:
for _ in range(12):
next(f)
readme_text = f.read()
with gr.Blocks() as demo:
state = gr.State(None)
gr.Markdown(readme_text)
gr.Markdown('# Interactive Demo')
with gr.Row():
with gr.Column():
with gr.Row():
inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1)
btn_setup = gr.Button("Setup/Reset Simulation")
info_setup = gr.Markdown(label='Setup Info')
with gr.Column():
img_setup = gr.Image(label='Current Simulation')
with gr.Row():
with gr.Column():
inp_instruction = gr.Textbox(label='Task Name', lines=1)
btn_run = gr.Button("Run (this may take 30+ seconds)")
info_run = gr.Markdown(label='Generated Code')
with gr.Column():
video_run = gr.Video(label='Video of Last Instruction')
btn_setup.click(
setup,
inputs=[inp_api_key],
outputs=[info_setup, img_setup, state]
)
btn_run.click(
run,
inputs=[inp_instruction, state],
outputs=[info_run, video_run]
)
demo.queue().launch(show_error=True)