import warnings from typing import Tuple, Dict import networkx as nx import numpy as np from geopandas import GeoDataFrame from shapely.geometry import MultiPoint from libpysal.weights.contiguity import Voronoi as Voronoi_weights from sklearn.neighbors import kneighbors_graph from sklearn.metrics import pairwise_distances from facility_location.utils.config import Config import time class FacilityLocationClient: def __init__(self, cfg: Config, rng: np.random.Generator): self.cfg = cfg self.rng = rng self._cfg_tabu_time = cfg.env_specs['tabu_time'] self._t = 0 def set_instance(self, points: np.ndarray, demands: np.ndarray, n: int, p: int, real: bool) -> None: self._points = points self._demands = demands points_geom = MultiPoint(points) self._gdf = GeoDataFrame({ 'geometry': points_geom.geoms, 'demand': demands, }) self._n = n self._p = p self._old_facility_mask = np.zeros(self._n, dtype=bool) self._new_facility_mask = np.zeros(self._n, dtype=bool) self._construct_static_graph() if real: self._distance_matrix = pairwise_distances(points, metric='haversine') else: self._distance_matrix = pairwise_distances(points, metric='euclidean') self._cost_matrix = self._distance_matrix * self._demands[:, None] self._gain = np.zeros(self._n) self._loss = np.zeros(self._n) self._add_time = np.full(self._n, -np.inf) self._drop_time = np.full(self._n, -np.inf) self.reset_tabu_time() def get_instance(self) -> Tuple[np.ndarray, np.ndarray, int, int]: return self._points, self._demands, self._n, self._p def get_distance_and_cost_matrix(self) -> Tuple[np.ndarray, np.ndarray]: return self._distance_matrix, self._cost_matrix def get_avg_distance_and_cost(self) -> Tuple[np.ndarray, np.ndarray]: avg_distance = self._distance_matrix.sum(axis=-1)/(self._n - 1) avg_cost = self._cost_matrix.sum(axis=-1)/(self._n - 1) return avg_distance, avg_cost def _construct_static_graph(self) -> None: self._connection_matrix = kneighbors_graph(self._points, n_neighbors=3, mode="connectivity").toarray() self._static_graph = nx.from_numpy_matrix(self._connection_matrix) self._static_edges = np.array(self._static_graph.edges(), dtype=np.int64) def _construct_dynamic_graph(self,stage=1) -> None: t1 = time.time() try: solution_distace_min = np.partition(self._distance_matrix[:, self._solution][self._solution, :], 3, axis=-1)[:,2] except: raise ValueError('stop') solution_distance_matrix = np.zeros((self._n, self._n)) solution_distance_matrix[:, self._solution] = solution_distace_min solution_knearest_matrix = np.logical_and(self._distance_matrix < solution_distance_matrix, self._distance_matrix > 0) if stage == 2: old_facility_mask, new_facility_mask = self.get_facility_mask() solution_matrix = np.logical_and(np.logical_and(self._solution, old_facility_mask)[:, None], (np.logical_and(~self._solution, new_facility_mask)[None, :])) # print('solution:',self._solution) # print('old_facility_mask:',old_facility_mask) # print('new_facility_mask:',new_facility_mask) else: old_tabu_mask, new_tabu_mask = self.get_tabu_mask(self._t) solution_matrix = np.logical_and(np.logical_and(self._solution, old_tabu_mask)[:, None], (np.logical_and(~self._solution, new_tabu_mask)[None, :])) # print('solution:',self._solution) # print('old_tabu_mask:',old_tabu_mask) # print('new_tabu_mask:',new_tabu_mask) solution_matrix = np.logical_or(solution_matrix, solution_matrix.T) gainloss_matrix = np.logical_and((self._gain[:, None] > self._loss[None, :]), self._loss[None, :] > 0) graph_matrix = np.logical_and(solution_matrix, np.logical_or(gainloss_matrix, solution_knearest_matrix)) if not np.any(graph_matrix): if np.any(solution_matrix): graph_matrix = solution_matrix if not np.any(graph_matrix): raise ValueError('Invalid graph_matrix') else: # if stage==2: # print('[!] No solution_matrix') # print('solution:',self._solution) # print('old_facility_mask:',old_facility_mask) # print('new_facility_mask:',new_facility_mask) # else: # print('[!] No solution_matrix') # print('solution:',self._solution) # print('old_tabu_mask:',old_tabu_mask) # print('new_tabu_mask:',new_tabu_mask) graph_matrix = self._solution[:, None] ^ self._solution[None, :] self._dynamic_graph = nx.from_numpy_matrix(graph_matrix) self._dynamic_edges = np.array(self._dynamic_graph.edges(), dtype=np.int64) t2 = time.time() # print('dynamic graph time:',t2-t1) def get_static_adjacency_list(self) -> np.ndarray: return self._static_edges def get_dynamic_adjacency_list(self) -> np.ndarray: return self._dynamic_edges def compute_initial_solution(self) -> Tuple[float, np.ndarray]: self._solution = np.zeros(self._n, dtype=bool) p_0 = self._demands.argmax() self._solution[p_0] = True for _ in range(self._p - 1): p_max_cost = self._cost_matrix[:, self._solution].min(axis=-1).argmax() self._solution[p_max_cost] = True self._init_gain_and_loss() self._construct_dynamic_graph() self._old_facility_mask = self._solution self._new_facility_mask = ~self._solution return self.compute_obj_value(), self._solution def compute_obj_value(self) -> float: obj_value = self._cost_matrix[:, self._solution].min(axis=-1).sum() return obj_value def compute_obj_value_from_solution(self, solution, stage=1) -> float: self._solution = solution self._init_gain_and_loss() self._construct_dynamic_graph(stage) obj_value = self.compute_obj_value() return obj_value # def swap(self, old_facility: int, new_facility: int, t: int) -> Tuple[float, np.ndarray, Dict]: # if old_facility >= self._n or not self._solution[old_facility]: # warn_msg = f'Old facility {old_facility} is not a facility of the current solution {self._solution}.' # warnings.warn(warn_msg) # old_facility = self.rng.choice(np.arange(self._n)[self._solution]) # if new_facility >= self._n or self._solution[new_facility]: # warn_msg = f'New facility {new_facility} is already a facility of the current solution {self._solution}.' # warnings.warn(warn_msg) # new_facility = self.rng.choice(np.arange(self._n)[~self._solution]) # self._solution[old_facility] = False # self._solution[new_facility] = True # self._drop_time[old_facility] = t # self._add_time[new_facility] = t # self._t = t # return self.compute_obj_value(), self._solution, {} def swap(self, facility_pair_index: int, t: int, stage=1) -> Tuple[float, np.ndarray, Dict]: facility_pair = self._dynamic_edges[facility_pair_index] # print(facility_pair) facility1 = facility_pair[0] facility2 = facility_pair[1] if (not self._solution[facility1]) and (self._solution[facility2]): new_facility = facility1 old_facility = facility2 elif (not self._solution[facility2]) and (self._solution[facility1]): new_facility = facility2 old_facility = facility1 else: raise ValueError('stop') self._solution[old_facility] = False self._solution[new_facility] = True if stage == 1: self._old_facility_mask[new_facility] = False self._new_facility_mask[old_facility] = True else: self._old_facility_mask[new_facility] = False self._new_facility_mask[old_facility] = False self._drop_time[old_facility] = t self._add_time[new_facility] = t self._t = t self._solution[old_facility] = False self._solution[new_facility] = True # print(self._solution,old_facility,new_facility) self._update_env(new_facility, old_facility, stage) # print('st:',self._t) return self.compute_obj_value(), self._solution, {} def get_tabu_mask(self, t: int) -> Tuple[np.ndarray, np.ndarray]: old_tabu_mask = self._add_time < t - self._drop_tabu_time new_tabu_mask = self._drop_time < t - self._add_tabu_time return old_tabu_mask, new_tabu_mask def reset_tabu_time(self) -> None: self._t = 0 if self._cfg_tabu_time <= 0: self._add_tabu_time = 0 self._drop_tabu_time = 0 else: self._add_tabu_time = self.rng.integers(0.1 * self._p, 0.5 * self._p) self._drop_tabu_time = self.rng.integers(0.1 * self._p, 0.5 * self._p) def get_current_solution(self) -> np.ndarray: return self._solution def set_solution(self, solution: np.ndarray) -> None: self._solution = solution def get_current_distance(self) -> np.ndarray: dis2poi = self._distance_matrix[:, self._solution] if self._p > 2: dis = np.partition(dis2poi, 2, axis=-1)[:,:2] else: dis = dis2poi.min(axis=-1) dis = np.stack([dis, dis], axis=-1) return dis def get_current_cost(self) -> np.ndarray: cost2poi = self._cost_matrix[:, self._solution] if self._p > 2: cost = np.partition(cost2poi, 2, axis=-1)[:,:2] else: cost = cost2poi.min(axis=-1) cost = np.stack([cost, cost], axis=-1) return cost def get_gain_and_loss(self) -> Tuple[np.ndarray, np.ndarray]: return self._gain, self._loss def get_gdf_facilities(self) -> Tuple[GeoDataFrame, np.ndarray]: solution = self._solution facilities = np.arange(self._n)[solution] gdf = self._gdf.copy() gdf['facility'] = False gdf.loc[facilities, 'facility'] = True node2facility = np.arange(self._n)[solution][self._cost_matrix[:, solution].argmin(axis=-1)] gdf['assignment'] = node2facility return gdf, facilities def _init_env(self): self._init_gain_and_loss() self._construct_dynamic_graph() def _update_env(self, insert_facility, remove_facility, stage): self._update_gain_and_loss(insert_facility, remove_facility) self._construct_dynamic_graph(stage) def _init_gain_and_loss(self): t1 = time.time() for i in range(self._n): _fake_solution = list(self._solution) if self._solution[i]: _fake_solution[i] = False self._loss[i] = self._cost_matrix[:, _fake_solution].min(axis=-1).sum() - self._cost_matrix[:, self._solution].min(axis=-1).sum() self._gain[i] = 0 else: _fake_solution[i] = True self._gain[i] = self._cost_matrix[:, self._solution].min(axis=-1).sum() - self._cost_matrix[:, _fake_solution].min(axis=-1).sum() self._loss[i] = 0 self.argpartition = np.argpartition(self._distance_matrix[:, self._solution], 2, axis=-1)[:,:2] t2 = time.time() # print('init gainloss time:',t2-t1) def _update_gain_and_loss(self, insert_facility, remove_facility): # self._init_gain_and_loss() # return t1 = time.time() _pre_solution = list(self._solution) _pre_solution[insert_facility] = False _pre_solution[remove_facility] = True pre_closest_demands2solution = self._cost_matrix[:, _pre_solution][np.arange(self._n)[:, None], self.argpartition] argpartition = np.argpartition(self._distance_matrix[:, self._solution], 2, axis=-1)[:,:2] closest_demands2solution = self._cost_matrix[:, self._solution][np.arange(self._n)[:, None], argpartition] pre_solution_idx = np.where(_pre_solution)[0] solution_idx = np.where(self._solution)[0] for i in range(self._n): if remove_facility in self.argpartition[i] or insert_facility in argpartition[i]: self._loss[solution_idx[argpartition[i][0]]] += closest_demands2solution[i][1] - closest_demands2solution[i][0] self._loss[pre_solution_idx[self.argpartition[i][0]]] -= pre_closest_demands2solution[i][1] - pre_closest_demands2solution[i][0] # if self.argpartition[i][0] != argpartition[i][0]: # for j in range(self._n): # if self._distance_matrix[i, j] < self._distance_matrix[i, self._solution][argpartition[i][0]]: # self._gain[j] += max(0, closest_demands2solution[i][0] - self._cost_matrix[i, j]) # if self._distance_matrix[i, j] < self._distance_matrix[i, self._solution][self.argpartition[i][0]]: # self._gain[j] -= max(0, pre_closest_demands2solution[i][0] - self._cost_matrix[i, j]) self._loss[remove_facility] = 0 self._gain[insert_facility] = 0 self.argpartition = list(argpartition) # print(self._gain, self._loss) t2 = time.time() # print('update gainloss time:',t2-t1) def init_facility_mask(self, old_facility, new_facility): self._old_facility_mask[old_facility] = True self._new_facility_mask[new_facility] = True def get_facility_mask(self): return self._old_facility_mask, self._new_facility_mask