Spaces:
Runtime error
Runtime error
import io | |
import warnings | |
from typing import Tuple, Dict, Optional, List, Text | |
import gym | |
import math | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import pickle, os | |
from numpy import ndarray | |
from facility_location.utils.config import Config | |
from facility_location.env.facility_location_client import FacilityLocationClient | |
from facility_location.env.obs_extractor import ObsExtractor | |
from stable_baselines3 import PPO | |
from stable_baselines3.common.vec_env import DummyVecEnv | |
from facility_location.agent import MaskedFacilityLocationActorCriticPolicy | |
from facility_location.utils.policy import get_policy_kwargs | |
class PMPEnv(gym.Env): | |
EPSILON = 1e-6 | |
def __init__(self, | |
cfg: Config): | |
self.cfg = cfg | |
self._train_region = cfg.env_specs['region'] | |
self._eval_region = cfg.eval_specs['region'] | |
self._min_n = cfg.env_specs['min_n'] | |
self._max_n = cfg.env_specs['max_n'] | |
self._min_p_ratio = cfg.env_specs['min_p_ratio'] | |
self._max_p_ratio = cfg.env_specs['max_p_ratio'] | |
self._max_steps_scale = cfg.env_specs['max_steps_scale'] | |
self._tabu_stable_steps_scale = cfg.env_specs['tabu_stable_steps_scale'] | |
self._popstar = cfg.env_specs['popstar'] | |
self._seed(cfg.seed) | |
self._done = False | |
self._set_node_edge_range() | |
self._flc = FacilityLocationClient(cfg, self._np_random) | |
self._obs_extractor = ObsExtractor(cfg, self._flc, self._node_range, self._edge_range) | |
self._declare_spaces() | |
def _declare_spaces(self) -> None: | |
self.observation_space = gym.spaces.Dict({ | |
'node_features': gym.spaces.Box(low=0, high=1, shape=(self._node_range, self.get_node_feature_dim())), | |
'static_adjacency_list': gym.spaces.Box(low=0, high=self._node_range, shape=(self._edge_range, 2), dtype=np.int64), | |
'dynamic_adjacency_list': gym.spaces.Box(low=0, high=self._node_range, shape=(self._edge_range, 2), dtype=np.int64), | |
'node_mask': gym.spaces.Box(low=0, high=1, shape=(self._node_range,), dtype=np.bool), | |
'static_edge_mask': gym.spaces.Box(low=0, high=1, shape=(self._edge_range,), dtype=np.bool), | |
'dynamic_edge_mask': gym.spaces.Box(low=0, high=1, shape=(self._edge_range,), dtype=np.bool), | |
}) | |
if not self._popstar: | |
self.action_space = gym.spaces.Discrete(self._node_range ** 2) | |
else: | |
self.action_space = gym.spaces.Discrete(self._node_range ** 2) | |
def _set_node_edge_range(self) -> None: | |
self._node_range = self._max_n + 2 | |
self._edge_range = int(self._max_n ** 2 * self._max_p_ratio) | |
def get_node_feature_dim(self) -> int: | |
return self._obs_extractor.get_node_dim() | |
def _seed(self, seed: int) -> None: | |
self._np_random = np.random.default_rng(seed) | |
def get_reward(self) -> float: | |
reward = self._obj_value[self._t - 1] - self._obj_value[self._t] | |
return reward | |
def _transform_action(self, action: np.ndarray) -> np.ndarray: | |
if self._popstar: | |
action = np.array(np.unravel_index(action, (self._node_range, self._node_range))) | |
action = action - 1 | |
return action | |
def step(self, action: np.ndarray): | |
if self._done: | |
raise RuntimeError('Action taken after episode is done.') | |
obj_value, solution, info = self._flc.swap(action, self._t) | |
self._t += 1 | |
self._done = (self._t == self._max_steps) | |
self._obj_value[self._t] = obj_value | |
self._solution[self._t] = solution | |
reward = self.get_reward() | |
if obj_value < self._best_obj_value - self.EPSILON: | |
self._best_obj_value = obj_value | |
self._best_solution = solution | |
self._last_best_t = self._t | |
elif (self._t - self._last_best_t) % self._tabu_stable_steps == 0: | |
self._flc.reset_tabu_time() | |
# if self._done: | |
# print('done') | |
# for i in range(self._t): | |
# print(f'{i}:',np.where(self._solution[i])) | |
return self._get_obs(self._t), reward, self._done, False, info | |
def reset(self, seed = 0) -> Optional[Dict]: | |
if self._train_region is None: | |
points, demands, n, p = self._generate_new_instance() | |
self._flc.set_instance(points, demands, n, p, False) | |
else: | |
points, demands, n, p = self._use_real_instance() | |
self._flc.set_instance(points, demands, n, p, True) | |
return self.prepare(n, p), {} | |
def prepare(self, n: int, p: int) -> Dict: | |
initial_obj_value, initial_solution = self._flc.compute_initial_solution() | |
self._obs_extractor.reset() | |
self._done = False | |
self._t = 0 | |
self._max_steps = max(int(p * self._max_steps_scale), 5) | |
self._obj_value = np.zeros(self._max_steps + 1) | |
self._obj_value[0] = initial_obj_value | |
self._solution = np.zeros((self._max_steps + 1, n), dtype=bool) | |
self._solution[0] = initial_solution | |
self._best_solution = initial_solution | |
self._best_obj_value = initial_obj_value | |
self._last_best_t = 0 | |
self._tabu_stable_steps = max(1, round(self._max_steps * self._tabu_stable_steps_scale)) | |
return self._get_obs(self._t) | |
def render(self, mode='human', dpi=300) -> Optional[np.ndarray]: | |
gdf, facilities = self._flc.get_gdf_facilities() | |
if len(facilities) > 10: | |
warnings.warn('Too many facilities to render. Only rendering the first 10.') | |
facilities = facilities[:10] | |
cm = plt.get_cmap('tab10') | |
fig, axs = plt.subplots(1, 2, figsize=(12, 6), dpi=dpi) | |
for i, f in enumerate(facilities): | |
gdf.loc[gdf['assignment'] == f].plot(ax=axs[0], | |
zorder=2, | |
alpha=0.7, | |
edgecolor="k", | |
color=cm(i)) | |
gdf.loc[[f]].plot(ax=axs[0], | |
marker='*', | |
markersize=300, | |
zorder=3, | |
alpha=0.7, | |
edgecolor="k", | |
color=cm(i)) | |
axs[0].set_title("Facility Location", fontweight="bold") | |
plot_obj_value = self._obj_value[:self._t + 1] | |
axs[1].plot(plot_obj_value, marker='.', markersize=10, color='k') | |
axs[1].set_title("Objective Value", fontweight="bold") | |
axs[1].set_xticks(np.arange(self._max_steps + 1, step=math.ceil((self._max_steps + 1) / 10))) | |
fig.tight_layout() | |
if mode == 'human': | |
plt.show() | |
else: | |
io_buf = io.BytesIO() | |
fig.savefig(io_buf, format='raw', dpi=dpi) | |
io_buf.seek(0) | |
img_arr = np.reshape(np.frombuffer(io_buf.getvalue(), dtype=np.uint8), | |
newshape=(int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)) | |
io_buf.close() | |
return img_arr | |
def close(self): | |
plt.close() | |
def _generate_new_instance(self) -> Tuple[np.ndarray, np.ndarray, int, int]: | |
n = self._np_random.integers(self._min_n, self._max_n, endpoint=True) | |
p_ratio = self._np_random.uniform(self._min_p_ratio, self._max_p_ratio) | |
p = int(max(n * p_ratio, 4)) | |
points = self._np_random.uniform(size=(n, 2)) | |
while np.unique(points, axis=0).shape[0] != n: | |
points = self._np_random.uniform(size=(n, 2)) | |
demands = self._np_random.random(size=(n,)) | |
return points, demands, n, p | |
def _use_real_instance(self) -> Tuple[np.ndarray, np.ndarray, int, int]: | |
data_path = './data/{}/pkl'.format(self.cfg.eval_specs['region']) | |
files = os.listdir(data_path) | |
files = [f for f in files if f.endswith('.pkl')] | |
sample_data_path = os.path.join(data_path, files[self._np_random.integers(len(files))]) | |
with open(sample_data_path, 'rb') as f: | |
np_data = pickle.load(f) | |
n = self._np_random.integers(self._min_n, self._max_n, endpoint=True) | |
p = max(int(n * self._np_random.uniform(self._min_p_ratio, self._max_p_ratio)), 4) | |
sample_cbgs = self._np_random.choice(list(np_data[1].keys()), n, replace=False) | |
points = [] | |
demands = [] | |
for cbg in sample_cbgs: | |
points.append(np_data[1][cbg]['pos']) | |
demands.append(np_data[1][cbg]['demand']) | |
points = np.array(points) | |
demands = np.array(demands) | |
return points, demands, n, p | |
def _get_obs(self, t: int) -> Dict: | |
return self._obs_extractor.get_obs(t) | |
def get_initial_solution(self) -> np.ndarray: | |
return self._solution[0] | |
class EvalPMPEnv(PMPEnv): | |
def __init__(self, | |
cfg: Config, | |
positions, demands, n, p, boost=False): | |
self._eval_np = (n,p) | |
self._eval_seed = cfg.eval_specs['seed'] | |
self._boost = boost | |
self.points = positions | |
self.demands = demands | |
self._n = n | |
self._p = p | |
super().__init__(cfg) | |
def _set_node_edge_range(self) -> None: | |
n, p = self._eval_np | |
self._node_range = n + 2 | |
self._edge_range = n * p | |
def get_eval_num_cases(self) -> int: | |
return self._eval_num_cases | |
def get_eval_np(self) -> Tuple[int, int]: | |
return self._eval_np | |
def reset_instance_id(self) -> None: | |
self._instance_id = 0 | |
def step(self, action: np.ndarray): | |
if self._done: | |
raise RuntimeError('Action taken after episode is done.') | |
obj_value, solution, info = self._flc.swap(action, self._t) | |
self._t += 1 | |
self._done = (self._t == self._max_steps) | |
self._obj_value[self._t] = obj_value | |
self._solution[self._t] = solution | |
reward = self.get_reward() | |
if obj_value < self._best_obj_value - self.EPSILON: | |
self._best_obj_value = obj_value | |
self._best_solution = solution | |
self._last_best_t = self._t | |
elif (self._t - self._last_best_t) % self._tabu_stable_steps == 0: | |
self._flc.reset_tabu_time() | |
print(self._t, self._max_steps) | |
return self._get_obs(self._t), reward, self._done, False, info | |
def get_reward(self) -> float: | |
if self._done: | |
reward = -np.min(self._obj_value) | |
else: | |
reward = 0.0 | |
return reward | |
def get_best_solution(self) -> np.ndarray: | |
return self._best_solution | |
def reset(self, seed = 0) -> Dict: | |
self._flc.set_instance(self.points, self.demands, self._n, self._p, False) | |
return self.prepare(self._n, self._p, self._boost), {} | |
def prepare(self, n: int, p: int, boost: bool) -> Dict: | |
initial_obj_value, initial_solution = self._flc.compute_initial_solution() | |
self._obs_extractor.reset() | |
self._done = False | |
self._t = 0 | |
self._max_steps = max(int(p * self._max_steps_scale), 5) | |
if boost: | |
self._max_steps = max(int(self._max_steps_scale / 10), 5) | |
self._obj_value = np.zeros(self._max_steps + 1) | |
self._obj_value[0] = initial_obj_value | |
self._solution = np.zeros((self._max_steps + 1, n), dtype=bool) | |
self._solution[0] = initial_solution | |
self._best_solution = initial_solution | |
self._best_obj_value = initial_obj_value | |
self._last_best_t = 0 | |
self._tabu_stable_steps = max(1, round(self._max_steps * self._tabu_stable_steps_scale)) | |
return self._get_obs(self._t) | |
def get_instance(self) -> Tuple[np.ndarray, np.ndarray, int, int]: | |
points, demands, n, p = self._flc.get_instance() | |
return points, demands, n, p | |
def get_distance_and_cost(self) -> Tuple[np.ndarray, np.ndarray]: | |
return self._flc.get_distance_and_cost_matrix() | |
def evaluate(self, solution: np.ndarray) -> float: | |
self._flc.set_solution(solution) | |
obj_value = self._flc.compute_obj_value() | |
return obj_value | |
class MULTIPMP(PMPEnv): | |
EPSILON = 1e-6 | |
def __init__(self, | |
cfg, | |
data_npy, | |
boost = False): | |
self.cfg = cfg | |
self.data_npy = data_npy | |
self._boost = boost | |
self._all_points, self._all_demands, self._n, self._all_p = self._load_multi_facility_data(data_npy) | |
self.boost = boost | |
self._all_solutions = self._load_multi_facility_solutions(boost) | |
print('all_solutions:', self._all_solutions) | |
self._final_solutions = list(self._all_solutions) | |
self._num_types = len(self._all_p) | |
self._current_type = 0 | |
self._all_max_steps, self._old_mask, self._new_mask = self._get_max_steps() | |
super().__init__(cfg) | |
def _set_node_edge_range(self) -> None: | |
self._node_range = self._n + 2 | |
self._edge_range = self._n * max(self._all_p) | |
def step(self, action: np.ndarray): | |
if self._num_types == 1: | |
reward = self.get_reward() | |
self._done = True | |
pickle.dump(self._final_solutions, open('./facility_location/solutions.pkl', 'wb')) | |
return self._get_obs(self._t), reward, self._done, False, {} | |
if self._done: | |
raise RuntimeError('Action taken after episode is done.') | |
obj_value, solution, info = self._flc.swap(action, self._t, stage=2) | |
self._t += 1 | |
self._done = (self._t == self._all_max_steps[-1] and self._current_type == len(self._all_max_steps) - 1) | |
self._obj_value[self._t] = obj_value | |
self._solution[self._t] = solution | |
reward = self.get_reward() | |
if obj_value < self._best_obj_value - self.EPSILON: | |
self._best_obj_value = obj_value | |
self._best_solution = solution | |
self._last_best_t = self._t | |
elif (self._t - self._last_best_t) % self._tabu_stable_steps == 0: | |
self._flc.reset_tabu_time() | |
print(self._t, self._all_max_steps[self._current_type]) | |
if self._t == self._all_max_steps[self._current_type] and not self._done: | |
self._t = 0 | |
self._multi_obj += obj_value | |
self._final_solutions[self._current_type] = solution | |
self._update_type() | |
if self._done: | |
pickle.dump(self._final_solutions, open('./facility_location/solutions.pkl', 'wb')) | |
return self._get_obs(self._t), reward, self._done, False, info | |
def reset(self, seed = 0) -> Optional[Dict]: | |
self._current_type = 0 | |
points = self._all_points | |
demands = self._all_demands[:,0] | |
n = self._n | |
p = self._all_p[0] | |
solution = self._all_solutions[0] | |
self._multi_obj = 0 | |
self._flc.set_instance(points, demands, n, p, True) | |
return self.prepare(n, p, solution), {} | |
def _update_type(self): | |
if self._current_type >= self._num_types: | |
raise RuntimeError('Action taken after episode is done.') | |
if self._current_type < self._num_types - 1: | |
self._current_type += 1 | |
print(f'current type: {self._current_type}') | |
print(self._num_types) | |
points = self._all_points | |
demands = self._all_demands[:,self._current_type] | |
n = self._n | |
p = self._all_p[self._current_type] | |
solution = self._all_solutions[self._current_type] | |
self._flc.set_instance(points, demands, n, p, True) | |
self.prepare(n, p, solution) | |
def prepare(self, n: int, p: int, solution: list) -> Dict: | |
self._obs_extractor.reset() | |
self._done = False | |
self._t = 0 | |
if len(self._all_p) > 1: | |
self._max_steps = self._all_max_steps[self._current_type] | |
self._flc.init_facility_mask(self._old_mask[self._current_type], self._new_mask[self._current_type]) | |
else: | |
self._max_steps = 0 | |
initial_solution = solution | |
initial_obj_value = self._flc.compute_obj_value_from_solution(initial_solution,stage=2) | |
self._obj_value = np.zeros(self._max_steps + 1) | |
self._obj_value[0] = initial_obj_value | |
self._solution = np.zeros((self._max_steps + 1, n), dtype=bool) | |
self._solution[0] = initial_solution | |
self._best_solution = initial_solution | |
self._best_obj_value = initial_obj_value | |
self._last_best_t = 0 | |
self._tabu_stable_steps = max(1, round(self._max_steps * self._tabu_stable_steps_scale)) | |
return self._get_obs(self._t) | |
def _get_max_steps(self) -> list: | |
# print(self._all_solutions) | |
tmp_all_solitions = list(self._all_solutions) | |
max_steps = [] | |
old_idx = [] | |
new_idx = [] | |
for t in range(self._num_types): | |
count_true = [sum(s) for s in zip(*(tmp_all_solitions[t:]))] | |
# print(count_true) | |
old = [i for i in range(len(count_true)) if count_true[i] > 1 and tmp_all_solitions[t][i]] | |
new = [i for i in range(len(count_true)) if count_true[i] == 0] | |
if len(old): | |
old_idx.append(old) | |
new_idx.append(new) | |
max_steps.append(len(old)) | |
for i in old: | |
count_true[i] = count_true[i] - 1 | |
# print(max_steps, old_idx, new_idx) | |
return max_steps, old_idx, new_idx | |
def _generate_new_instance(self) -> Tuple[np.ndarray, np.ndarray, int, int]: | |
n = self._np_random.integers(self._min_n, self._max_n, endpoint=True) | |
p_ratio = self._np_random.uniform(self._min_p_ratio, self._max_p_ratio) | |
p = int(max(n * p_ratio, 4)) | |
points = self._np_random.uniform(size=(n, 2)) | |
while np.unique(points, axis=0).shape[0] != n: | |
points = self._np_random.uniform(size=(n, 2)) | |
demands = self._np_random.random(size=(n,)) | |
return points, demands, n, p | |
def _load_multi_facility_data(self, data_npy) -> Tuple[np.ndarray, np.ndarray]: | |
data = data_npy.split('\n') | |
n = len(data) | |
p = int((len(data[0].split(' '))-2) / 2) | |
positions = [] | |
demands = [] | |
actual_facilities = [] | |
ps = [] | |
for row in data: | |
row = row.split(' ') | |
row = [x for x in row if len(x)] | |
positions.append([float(row[0]), float(row[1])]) | |
demand = [] | |
for i in range(2, 2+p): | |
demand.append(float(row[i])) | |
demands.append(demand) | |
actual_facility = [] | |
for i in range(2+p, 2+2*p): | |
actual_facility.append(bool(int(float(row[i])))) | |
actual_facilities.append(actual_facility) | |
positions = np.array(positions) | |
positions = np.deg2rad(positions) | |
demands = np.array(demands) | |
actual_facilities = np.array(actual_facilities) | |
ps = actual_facilities.sum(axis=0) | |
return positions, demands, n, ps | |
def _load_multi_facility_solutions(self, boost) -> list: | |
def load_model(positions, demands, n, p, boost): | |
eval_env = EvalPMPEnv(self.cfg, positions, demands, n, p, boost) | |
eval_env = DummyVecEnv([lambda: eval_env]) | |
policy_kwargs = get_policy_kwargs(self.cfg) | |
test_model = PPO(MaskedFacilityLocationActorCriticPolicy, | |
eval_env, | |
verbose=1, | |
policy_kwargs=policy_kwargs, | |
device='cuda:1') | |
train_model = PPO.load(self.cfg.load_model_path) | |
test_model.set_parameters(train_model.get_parameters()) | |
return test_model, eval_env | |
def get_optimal_solution(model, eval_env): | |
obs = eval_env.reset() | |
done = False | |
while not done: | |
action, _ = model.predict(obs, deterministic=True) | |
obs, _, done, info = eval_env.step(action) | |
return eval_env.get_attr('_best_solution')[0] | |
multi_solutions = [] | |
for i in range(len(self._all_p)): | |
positions = self._all_points | |
demands = self._all_demands[:,i] | |
n = self._n | |
p = self._all_p[i] | |
model, env = load_model(positions,demands,n,p,boost) | |
multi_solutions.append(get_optimal_solution(model, env)) | |
return multi_solutions | |
def get_reward(self) -> float: | |
if self._done: | |
reward = np.min(self._obj_value) | |
else: | |
reward = 0.0 | |
return reward | |