""" Author: Luigi Piccinelli Licensed under the CC-BY NC 4.0 license (http://creativecommons.org/licenses/by-nc/4.0/) """ import os import matplotlib.pyplot as plt import numpy as np import torch import wandb from PIL import Image from unidepth.utils.misc import ssi_helper def colorize( value: np.ndarray, vmin: float = None, vmax: float = None, cmap: str = "magma_r" ): # if already RGB, do nothing if value.ndim > 2: if value.shape[-1] > 1: return value value = value[..., 0] invalid_mask = value < 0.0001 # normalize vmin = value.min() if vmin is None else vmin vmax = value.max() if vmax is None else vmax value = (value - vmin) / (vmax - vmin) # vmin..vmax # set color cmapper = plt.get_cmap(cmap) value = cmapper(value, bytes=True) # (nxmx4) value[invalid_mask] = 0 img = value[..., :3] return img def image_grid(imgs: list[np.ndarray], rows: int, cols: int) -> np.ndarray: if not len(imgs): return None assert len(imgs) == rows * cols h, w = imgs[0].shape[:2] grid = Image.new("RGB", size=(cols * w, rows * h)) for i, img in enumerate(imgs): grid.paste( Image.fromarray(img.astype(np.uint8)).resize( (w, h), resample=Image.BILINEAR ), box=(i % cols * w, i // cols * h), ) return np.array(grid) def get_pointcloud_from_rgbd( image: np.array, depth: np.array, mask: np.ndarray, intrinsic_matrix: np.array, extrinsic_matrix: np.array = None, ): depth = np.array(depth).squeeze() mask = np.array(mask).squeeze() # Mask the depth array masked_depth = np.ma.masked_where(mask == False, depth) # masked_depth = np.ma.masked_greater(masked_depth, 8000) # Create idx array idxs = np.indices(masked_depth.shape) u_idxs = idxs[1] v_idxs = idxs[0] # Get only non-masked depth and idxs z = masked_depth[~masked_depth.mask] compressed_u_idxs = u_idxs[~masked_depth.mask] compressed_v_idxs = v_idxs[~masked_depth.mask] image = np.stack( [image[..., i][~masked_depth.mask] for i in range(image.shape[-1])], axis=-1 ) # Calculate local position of each point # Apply vectorized math to depth using compressed arrays cx = intrinsic_matrix[0, 2] fx = intrinsic_matrix[0, 0] x = (compressed_u_idxs - cx) * z / fx cy = intrinsic_matrix[1, 2] fy = intrinsic_matrix[1, 1] # Flip y as we want +y pointing up not down y = -((compressed_v_idxs - cy) * z / fy) # # Apply camera_matrix to pointcloud as to get the pointcloud in world coords # if extrinsic_matrix is not None: # # Calculate camera pose from extrinsic matrix # camera_matrix = np.linalg.inv(extrinsic_matrix) # # Create homogenous array of vectors by adding 4th entry of 1 # # At the same time flip z as for eye space the camera is looking down the -z axis # w = np.ones(z.shape) # x_y_z_eye_hom = np.vstack((x, y, -z, w)) # # Transform the points from eye space to world space # x_y_z_world = np.dot(camera_matrix, x_y_z_eye_hom)[:3] # return x_y_z_world.T # else: x_y_z_local = np.stack((x, y, z), axis=-1) return np.concatenate([x_y_z_local, image], axis=-1) def save_file_ply(xyz, rgb, pc_file): if rgb.max() < 1.001: rgb = rgb * 255.0 rgb = rgb.astype(np.uint8) # print(rgb) with open(pc_file, "w") as f: # headers f.writelines( [ "ply\n" "format ascii 1.0\n", "element vertex {}\n".format(xyz.shape[0]), "property float x\n", "property float y\n", "property float z\n", "property uchar red\n", "property uchar green\n", "property uchar blue\n", "end_header\n", ] ) for i in range(xyz.shape[0]): str_v = "{:10.6f} {:10.6f} {:10.6f} {:d} {:d} {:d}\n".format( xyz[i, 0], xyz[i, 1], xyz[i, 2], rgb[i, 0], rgb[i, 1], rgb[i, 2] ) f.write(str_v) # really awful fct... FIXME def log_train_artifacts(rgbs, gts, preds, ds_name, step, infos={}): rgbs = [ (127.5 * (rgb + 1)) .clip(0, 255) .to(torch.uint8) .cpu() .detach() .permute(1, 2, 0) .numpy() for rgb in rgbs ] new_gts, new_preds = [], [] if len(gts) > 0: for i, gt in enumerate(gts): scale, shift = ssi_helper( gts[i][gts[i] > 0].cpu().detach(), preds[i][gts[i] > 0].cpu().detach() ) gt = gts[i].cpu().detach().squeeze().numpy() pred = (preds[i].cpu().detach() * scale + shift).squeeze().numpy() vmin = gt[gt > 0].min() if (gt > 0).any() else 0.0 vmax = gt.max() if (gt > 0).any() else 0.1 new_gts.append(colorize(gt, vmin=vmin, vmax=vmax)) new_preds.append(colorize(pred, vmin=vmin, vmax=vmax)) gts, preds = new_gts, new_preds else: preds = [ colorize(pred.cpu().detach().squeeze().numpy(), 0.0, 80.0) for i, pred in enumerate(preds) ] num_additional, additionals = 0, [] for name, info in infos.items(): num_additional += 1 if info.shape[1] == 3: additionals.extend( [ (127.5 * (x + 1)) .clip(0, 255) .to(torch.uint8) .cpu() .detach() .permute(1, 2, 0) .numpy() for x in info[:4] ] ) else: additionals.extend( [ colorize(x.cpu().detach().squeeze().numpy()) for i, x in enumerate(info[:4]) ] ) num_rows = 2 + int(len(gts) > 0) + num_additional artifacts_grid = image_grid( [*rgbs, *gts, *preds, *additionals], num_rows, len(rgbs) ) try: wandb.log({f"{ds_name}_training": [wandb.Image(artifacts_grid)]}, step=step) except: Image.fromarray(artifacts_grid).save( os.path.join(os.environ["HOME"], "Workspace", f"art_grid{step}.png") ) print("Logging training images failed")