import math
import os
import json
import re
import cv2
from dataclasses import dataclass, field
import random
import imageio
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from craftsman.utils.typing import *
def fit_bounding_box(img, mask, marign_pix_dis, background_color):
# alpha_channel = img[:, :, 3]
alpha_channel = mask.numpy().squeeze()
height = np.any(alpha_channel, axis=1)
width = np.any(alpha_channel, axis=0)
h_min, h_max = np.where(height)[0][[0, -1]]
w_min, w_max = np.where(width)[0][[0, -1]]
box_height = h_max - h_min
box_width = w_max - w_min
cropped_image = img[h_min:h_max, w_min:w_max]
if box_height > box_width:
new_hight = 512 - 2 * marign_pix_dis
new_width = int((512 - 2 * marign_pix_dis) / (box_height) * box_width) + 1
new_hight = int((512 - 2 * marign_pix_dis) / (box_width) * box_height) + 1
new_width = 512 - 2 * marign_pix_dis
new_h_min_pos = int((512 - new_hight) / 2 + 1)
new_h_max_pos = new_hight + new_h_min_pos
new_w_min_pos = int((512 - new_width) / 2 + 1)
new_w_max_pos = new_width + new_w_min_pos
# extend of the bbox
new_image = np.full((512, 512, 3), background_color)
new_image[new_h_min_pos:new_h_max_pos, new_w_min_pos:new_w_max_pos, :] = cv2.resize(cropped_image.numpy(), (new_width, new_hight))
return torch.from_numpy(new_image)
class BaseDataModuleConfig:
local_dir: str = None
################################# Geometry part #################################
load_geometry: bool = True # whether to load geometry data
geo_data_type: str = "occupancy" # occupancy, sdf
geo_data_path: str = "" # path to the geometry data
# for occupancy and sdf data
n_samples: int = 4096 # number of points in input point cloud
upsample_ratio: int = 1 # upsample ratio for input point cloud
sampling_strategy: Optional[str] = None # sampling strategy for input point cloud
scale: float = 1.0 # scale of the input point cloud and target supervision
load_supervision: bool = True # whether to load supervision
supervision_type: str = "occupancy" # occupancy, sdf, tsdf
tsdf_threshold: float = 0.05 # threshold for truncating sdf values, used when input is sdf
n_supervision: int = 10000 # number of points in supervision
################################# Image part #################################
load_image: bool = False # whether to load images
image_data_path: str = "" # path to the image data
image_type: str = "rgb" # rgb, normal
background_color: Tuple[float, float, float] = field(
default_factory=lambda: (0.5, 0.5, 0.5)
idx: Optional[List[int]] = None # index of the image to load
n_views: int = 1 # number of views
marign_pix_dis: int = 30 # margin of the bounding box
batch_size: int = 32
num_workers: int = 8
class BaseDataset(Dataset):
def __init__(self, cfg: Any, split: str) -> None:
self.cfg: BaseDataModuleConfig = cfg
self.split = split
self.uids = json.load(open(f'{cfg.local_dir}/{split}.json'))
print(f"Loaded {len(self.uids)} {split} uids")
def __len__(self):
return len(self.uids)
def _load_shape_from_occupancy_or_sdf(self, index: int) -> Dict[str, Any]:
if self.cfg.geo_data_type == "occupancy":
# for input point cloud, using Objaverse-MIX data
pointcloud = np.load(f'{self.cfg.geo_data_path}/{self.uids[index]}/pointcloud.npz')
surface = np.asarray(pointcloud['points']) * 2 # range from -1 to 1
normal = np.asarray(pointcloud['normals'])
surface = np.concatenate([surface, normal], axis=1)
elif self.cfg.geo_data_type == "sdf":
# for sdf data with our own format
data = np.load(f'{self.cfg.geo_data_path}/{self.uids[index]}.npz')
# for input point cloud
surface = data["surface"]
raise NotImplementedError(f"Data type {self.cfg.geo_data_type} not implemented")
# random sampling
if self.cfg.sampling_strategy == "random":
rng = np.random.default_rng()
ind = rng.choice(surface.shape[0], self.cfg.upsample_ratio * self.cfg.n_samples, replace=False)
surface = surface[ind]
elif self.cfg.sampling_strategy == "fps":
import fpsample
kdline_fps_samples_idx = fpsample.bucket_fps_kdline_sampling(surface[:, :3], self.cfg.n_samples, h=5)
surface = surface[kdline_fps_samples_idx]
elif self.cfg.sampling_strategy is None:
raise NotImplementedError(f"sampling strategy {self.cfg.sampling_strategy} not implemented")
# rescale data
surface[:, :3] = surface[:, :3] * self.cfg.scale # target scale
ret = {
"uid": self.uids[index].split('/')[-1],
"surface": surface.astype(np.float32),
return ret
def _load_shape_supervision_occupancy_or_sdf(self, index: int) -> Dict[str, Any]:
# for supervision
ret = {}
if self.cfg.data_type == "occupancy":
points = np.load(f'{self.cfg.geo_data_path}/{self.uids[index]}/points.npz')
rand_points = np.asarray(points['points']) * 2 # range from -1.1 to 1.1
occupancies = np.asarray(points['occupancies'])
occupancies = np.unpackbits(occupancies)
elif self.cfg.data_type == "sdf":
data = np.load(f'{self.cfg.geo_data_path}/{self.uids[index]}.npz')
rand_points = data['rand_points']
sdfs = data['sdfs']
raise NotImplementedError(f"Data type {self.cfg.data_type} not implemented")
# random sampling
rng = np.random.default_rng()
ind = rng.choice(rand_points.shape[0], self.cfg.n_supervision, replace=False)
rand_points = rand_points[ind]
rand_points = rand_points * self.cfg.scale
ret["rand_points"] = rand_points.astype(np.float32)
if self.cfg.data_type == "occupancy":
assert self.cfg.supervision_type == "occupancy", "Only occupancy supervision is supported for occupancy data"
occupancies = occupancies[ind]
ret["occupancies"] = occupancies.astype(np.float32)
elif self.cfg.data_type == "sdf":
if self.cfg.supervision_type == "sdf":
ret["sdf"] = sdfs[ind].flatten().astype(np.float32)
elif self.cfg.supervision_type == "occupancy":
ret["occupancies"] = np.where(sdfs[ind].flatten() < 1e-3, 0, 1).astype(np.float32)
elif self.cfg.supervision_type == "tsdf":
ret["sdf"] = sdfs[ind].flatten().astype(np.float32).clip(-self.cfg.tsdf_threshold, self.cfg.tsdf_threshold) / self.cfg.tsdf_threshold
raise NotImplementedError(f"Supervision type {self.cfg.supervision_type} not implemented")
return ret
def _load_image(self, index: int) -> Dict[str, Any]:
def _load_single_image(img_path, background_color, marign_pix_dis=None):
img = torch.from_numpy(
/ 255.0
mask: Float[Tensor, "H W 1"] = img[:, :, -1:]
image: Float[Tensor, "H W 3"] = img[:, :, :3] * mask + background_color[
None, None, :
] * (1 - mask)
if marign_pix_dis is not None:
image = fit_bounding_box(image, mask, marign_pix_dis, background_color)
return image, mask
if self.cfg.background_color == [-1, -1, -1]:
background_color = torch.randint(0, 256, (3,))
background_color = torch.as_tensor(self.cfg.background_color)
ret = {}
if self.cfg.image_type == "rgb" or self.cfg.image_type == "normal":
assert self.cfg.n_views == 1, "Only single view is supported for single image"
sel_idx = random.choice(self.cfg.idx)
ret["sel_image_idx"] = sel_idx
if self.cfg.image_type == "rgb":
img_path = f'{self.cfg.image_data_path}/' + "/".join(self.uids[index].split('/')[-2:]) + f"/{'{:04d}'.format(sel_idx)}_rgb.jpeg"
elif self.cfg.image_type == "normal":
img_path = f'{self.cfg.image_data_path}/' + "/".join(self.uids[index].split('/')[-2:]) + f"/{'{:04d}'.format(sel_idx)}_normal.jpeg"
ret["image"], ret["mask"] = _load_single_image(img_path, background_color, self.cfg.marign_pix_dis)
raise NotImplementedError(f"Image type {self.cfg.image_type} not implemented")
return ret
def _get_data(self, index):
ret = {"uid": self.uids[index]}
# load geometry
if self.cfg.load_geometry:
if self.cfg.geo_data_type == "occupancy" or self.cfg.geo_data_type == "sdf":
# load shape
ret = self._load_shape_from_occupancy_or_sdf(index)
# load supervision for shape
if self.cfg.load_supervision:
raise NotImplementedError(f"Geo data type {self.cfg.geo_data_type} not implemented")
# load image
if self.cfg.load_image:
return ret
def __getitem__(self, index):
return self._get_data(index)
except Exception as e:
print(f"Error in {self.uids[index]}: {e}")
return self.__getitem__(np.random.randint(len(self)))
def collate(self, batch):
from torch.utils.data._utils.collate import default_collate_fn_map
return torch.utils.data.default_collate(batch)