PPOxFamily / ppof_ch4_code_p1.py
TuTuHuss
update(hus): update data from official server
7955c6f
# pip install minigrid
from typing import Union, Tuple, Dict, List, Optional
from multiprocessing import Process
import multiprocessing as mp
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import minigrid
import gymnasium as gym
from torch.optim.lr_scheduler import ExponentialLR, MultiStepLR
from tensorboardX import SummaryWriter
from minigrid.wrappers import FlatObsWrapper
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
if torch.cuda.is_available():
device = torch.device("cuda:0")
else:
device = torch.device("cpu")
train_config = dict(
train_iter=1024,
train_data_count=128,
test_data_count=4096,
)
little_RND_net_config = dict(
exp_name="little_rnd_network",
observation_shape=2835,
hidden_size_list=[32, 16],
learning_rate=1e-3,
batch_size=64,
update_per_collect=100,
obs_norm=True,
obs_norm_clamp_min=-1,
obs_norm_clamp_max=1,
reward_mse_ratio=1e5,
)
small_RND_net_config = dict(
exp_name="small_rnd_network",
observation_shape=2835,
hidden_size_list=[64, 64],
learning_rate=1e-3,
batch_size=64,
update_per_collect=100,
obs_norm=True,
obs_norm_clamp_min=-1,
obs_norm_clamp_max=1,
reward_mse_ratio=1e5,
)
standard_RND_net_config = dict(
exp_name="standard_rnd_network",
observation_shape=2835,
hidden_size_list=[128, 64],
learning_rate=1e-3,
batch_size=64,
update_per_collect=100,
obs_norm=True,
obs_norm_clamp_min=-1,
obs_norm_clamp_max=1,
reward_mse_ratio=1e5,
)
large_RND_net_config = dict(
exp_name="large_RND_network",
observation_shape=2835,
hidden_size_list=[256, 256],
learning_rate=1e-3,
batch_size=64,
update_per_collect=100,
obs_norm=True,
obs_norm_clamp_min=-1,
obs_norm_clamp_max=1,
reward_mse_ratio=1e5,
)
very_large_RND_net_config = dict(
exp_name="very_large_RND_network",
observation_shape=2835,
hidden_size_list=[512, 512],
learning_rate=1e-3,
batch_size=64,
update_per_collect=100,
obs_norm=True,
obs_norm_clamp_min=-1,
obs_norm_clamp_max=1,
reward_mse_ratio=1e5,
)
class FCEncoder(nn.Module):
def __init__(
self,
obs_shape: int,
hidden_size_list,
activation: Optional[nn.Module] = nn.ReLU(),
) -> None:
super(FCEncoder, self).__init__()
self.obs_shape = obs_shape
self.act = activation
self.init = nn.Linear(obs_shape, hidden_size_list[0])
layers = []
for i in range(len(hidden_size_list) - 1):
layers.append(nn.Linear(hidden_size_list[i], hidden_size_list[i + 1]))
layers.append(self.act)
self.main = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.act(self.init(x))
x = self.main(x)
return x
class RndNetwork(nn.Module):
def __init__(self, obs_shape: Union[int, list], hidden_size_list: list) -> None:
super(RndNetwork, self).__init__()
self.target = FCEncoder(obs_shape, hidden_size_list)
self.predictor = FCEncoder(obs_shape, hidden_size_list)
for param in self.target.parameters():
param.requires_grad = False
def forward(self, obs: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
predict_feature = self.predictor(obs)
with torch.no_grad():
target_feature = self.target(obs)
return predict_feature, target_feature
class RunningMeanStd(object):
def __init__(self, epsilon=1e-4, shape=(), device=torch.device('cpu')):
self._epsilon = epsilon
self._shape = shape
self._device = device
self.reset()
def update(self, x):
batch_mean = np.mean(x, axis=0)
batch_var = np.var(x, axis=0)
batch_count = x.shape[0]
new_count = batch_count + self._count
mean_delta = batch_mean - self._mean
new_mean = self._mean + mean_delta * batch_count / new_count
# this method for calculating new variable might be numerically unstable
m_a = self._var * self._count
m_b = batch_var * batch_count
m2 = m_a + m_b + np.square(mean_delta) * self._count * batch_count / new_count
new_var = m2 / new_count
self._mean = new_mean
self._var = new_var
self._count = new_count
def reset(self):
if len(self._shape) > 0:
self._mean = np.zeros(self._shape, 'float32')
self._var = np.ones(self._shape, 'float32')
else:
self._mean, self._var = 0., 1.
self._count = self._epsilon
@property
def mean(self) -> np.ndarray:
if np.isscalar(self._mean):
return self._mean
else:
return torch.FloatTensor(self._mean).to(self._device)
@property
def std(self) -> np.ndarray:
std = np.sqrt(self._var + 1e-8)
if np.isscalar(std):
return std
else:
return torch.FloatTensor(std).to(self._device)
class RndRewardModel():
def __init__(self, config) -> None: # noqa
super(RndRewardModel, self).__init__()
self.cfg = config
self.tb_logger = SummaryWriter(config["exp_name"])
self.reward_model = RndNetwork(
obs_shape=config["observation_shape"], hidden_size_list=config["hidden_size_list"]
).to(device)
self.opt = optim.Adam(self.reward_model.predictor.parameters(), config["learning_rate"])
self.scheduler = ExponentialLR(self.opt, gamma=0.997)
self.estimate_cnt_rnd = 0
if self.cfg["obs_norm"]:
self._running_mean_std_rnd_obs = RunningMeanStd(epsilon=1e-4, device=device)
def __del__(self):
self.tb_logger.flush()
self.tb_logger.close()
def train(self, data) -> None:
for _ in range(self.cfg["update_per_collect"]):
train_data: list = random.sample(data, self.cfg["batch_size"])
train_data: torch.Tensor = torch.stack(train_data).to(device)
if self.cfg["obs_norm"]:
# Note: observation normalization: transform obs to mean 0, std 1
self._running_mean_std_rnd_obs.update(train_data.cpu().numpy())
train_data = (train_data - self._running_mean_std_rnd_obs.mean) / self._running_mean_std_rnd_obs.std
train_data = torch.clamp(
train_data, min=self.cfg["obs_norm_clamp_min"], max=self.cfg["obs_norm_clamp_max"]
)
predict_feature, target_feature = self.reward_model(train_data)
loss = F.mse_loss(predict_feature, target_feature.detach())
self.opt.zero_grad()
loss.backward()
self.opt.step()
self.scheduler.step()
def estimate(self, data: list) -> List[Dict]:
"""
estimate the rnd intrinsic reward
"""
obs = torch.stack(data).to(device)
if self.cfg["obs_norm"]:
# Note: observation normalization: transform obs to mean 0, std 1
obs = (obs - self._running_mean_std_rnd_obs.mean) / self._running_mean_std_rnd_obs.std
obs = torch.clamp(obs, min=self.cfg["obs_norm_clamp_min"], max=self.cfg["obs_norm_clamp_max"])
with torch.no_grad():
self.estimate_cnt_rnd += 1
predict_feature, target_feature = self.reward_model(obs)
mse = F.mse_loss(predict_feature, target_feature, reduction='none').mean(dim=1)
self.tb_logger.add_scalar('rnd_reward/mse', mse.cpu().numpy().mean(), self.estimate_cnt_rnd)
# Note: according to the min-max normalization, transform rnd reward to [0,1]
rnd_reward = mse * self.cfg["reward_mse_ratio"] #(mse - mse.min()) / (mse.max() - mse.min() + 1e-11)
self.tb_logger.add_scalar('rnd_reward/rnd_reward_max', rnd_reward.max(), self.estimate_cnt_rnd)
self.tb_logger.add_scalar('rnd_reward/rnd_reward_mean', rnd_reward.mean(), self.estimate_cnt_rnd)
self.tb_logger.add_scalar('rnd_reward/rnd_reward_min', rnd_reward.min(), self.estimate_cnt_rnd)
rnd_reward = torch.chunk(rnd_reward, rnd_reward.shape[0], dim=0)
def training(config, train_data, test_data):
rnd_reward_model = RndRewardModel(config=config)
for i in range(train_config["train_iter"]):
rnd_reward_model.train([torch.Tensor(item["last_observation"]) for item in train_data[i]])
rnd_reward_model.estimate([torch.Tensor(item["last_observation"]) for item in test_data])
def main():
env = gym.make("MiniGrid-Empty-8x8-v0")
env_obs = FlatObsWrapper(env)
train_data = []
test_data = []
for i in range(train_config["train_iter"]):
train_data_per_iter = []
while len(train_data_per_iter) < train_config["train_data_count"]:
last_observation, _ = env_obs.reset()
terminated = False
while terminated != True and len(train_data_per_iter) < train_config["train_data_count"]:
action = env_obs.action_space.sample()
observation, reward, terminated, truncated, info = env_obs.step(action)
train_data_per_iter.append(
{
"last_observation": last_observation,
"action": action,
"reward": reward,
"observation": observation
}
)
last_observation = observation
env_obs.close()
train_data.append(train_data_per_iter)
while len(test_data) < train_config["test_data_count"]:
last_observation, _ = env_obs.reset()
terminated = False
while terminated != True and len(train_data_per_iter) < train_config["test_data_count"]:
action = env_obs.action_space.sample()
observation, reward, terminated, truncated, info = env_obs.step(action)
test_data.append(
{
"last_observation": last_observation,
"action": action,
"reward": reward,
"observation": observation
}
)
last_observation = observation
env_obs.close()
p0 = Process(target=training, args=(little_RND_net_config, train_data, test_data))
p0.start()
p1 = Process(target=training, args=(small_RND_net_config, train_data, test_data))
p1.start()
p2 = Process(target=training, args=(standard_RND_net_config, train_data, test_data))
p2.start()
p3 = Process(target=training, args=(large_RND_net_config, train_data, test_data))
p3.start()
p4 = Process(target=training, args=(very_large_RND_net_config, train_data, test_data))
p4.start()
p0.join()
p1.join()
p2.join()
p3.join()
p4.join()
if __name__ == "__main__":
mp.set_start_method('spawn')
main()