# https://github.com/XPixelGroup/BasicSR/blob/master/basicsr/data/degradations.py # Copyright (c) OpenMMLab. All rights reserved. # https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py import math import random import re from abc import ABCMeta, abstractmethod from pathlib import Path from typing import List, Dict from typing import Mapping, Any from typing import Optional, Union import cv2 import numpy as np import torch from PIL import Image from scipy import special from scipy.stats import multivariate_normal from torch import Tensor # from torchvision.transforms.functional_tensor import rgb_to_grayscale from torchvision.transforms._functional_tensor import rgb_to_grayscale # -------------------------------------------------------------------- # # --------------------------- blur kernels --------------------------- # # -------------------------------------------------------------------- # # --------------------------- util functions --------------------------- # def sigma_matrix2(sig_x, sig_y, theta): """Calculate the rotated sigma matrix (two dimensional matrix). Args: sig_x (float): sig_y (float): theta (float): Radian measurement. Returns: ndarray: Rotated sigma matrix. """ d_matrix = np.array([[sig_x ** 2, 0], [0, sig_y ** 2]]) u_matrix = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) return np.dot(u_matrix, np.dot(d_matrix, u_matrix.T)) def mesh_grid(kernel_size): """Generate the mesh grid, centering at zero. Args: kernel_size (int): Returns: xy (ndarray): with the shape (kernel_size, kernel_size, 2) xx (ndarray): with the shape (kernel_size, kernel_size) yy (ndarray): with the shape (kernel_size, kernel_size) """ ax = np.arange(-kernel_size // 2 + 1., kernel_size // 2 + 1.) xx, yy = np.meshgrid(ax, ax) xy = np.hstack((xx.reshape((kernel_size * kernel_size, 1)), yy.reshape(kernel_size * kernel_size, 1))).reshape(kernel_size, kernel_size, 2) return xy, xx, yy def pdf2(sigma_matrix, grid): """Calculate PDF of the bivariate Gaussian distribution. Args: sigma_matrix (ndarray): with the shape (2, 2) grid (ndarray): generated by :func:`mesh_grid`, with the shape (K, K, 2), K is the kernel size. Returns: kernel (ndarrray): un-normalized kernel. """ inverse_sigma = np.linalg.inv(sigma_matrix) kernel = np.exp(-0.5 * np.sum(np.dot(grid, inverse_sigma) * grid, 2)) return kernel def cdf2(d_matrix, grid): """Calculate the CDF of the standard bivariate Gaussian distribution. Used in skewed Gaussian distribution. Args: d_matrix (ndarrasy): skew matrix. grid (ndarray): generated by :func:`mesh_grid`, with the shape (K, K, 2), K is the kernel size. Returns: cdf (ndarray): skewed cdf. """ rv = multivariate_normal([0, 0], [[1, 0], [0, 1]]) grid = np.dot(grid, d_matrix) cdf = rv.cdf(grid) return cdf def bivariate_Gaussian(kernel_size, sig_x, sig_y, theta, grid=None, isotropic=True): """Generate a bivariate isotropic or anisotropic Gaussian kernel. In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. Args: kernel_size (int): sig_x (float): sig_y (float): theta (float): Radian measurement. grid (ndarray, optional): generated by :func:`mesh_grid`, with the shape (K, K, 2), K is the kernel size. Default: None isotropic (bool): Returns: kernel (ndarray): normalized kernel. """ if grid is None: grid, _, _ = mesh_grid(kernel_size) if isotropic: sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) else: sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) kernel = pdf2(sigma_matrix, grid) kernel = kernel / np.sum(kernel) return kernel def bivariate_generalized_Gaussian(kernel_size, sig_x, sig_y, theta, beta, grid=None, isotropic=True): """Generate a bivariate generalized Gaussian kernel. ``Paper: Parameter Estimation For Multivariate Generalized Gaussian Distributions`` In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. Args: kernel_size (int): sig_x (float): sig_y (float): theta (float): Radian measurement. beta (float): shape parameter, beta = 1 is the normal distribution. grid (ndarray, optional): generated by :func:`mesh_grid`, with the shape (K, K, 2), K is the kernel size. Default: None Returns: kernel (ndarray): normalized kernel. """ if grid is None: grid, _, _ = mesh_grid(kernel_size) if isotropic: sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) else: sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) inverse_sigma = np.linalg.inv(sigma_matrix) kernel = np.exp(-0.5 * np.power(np.sum(np.dot(grid, inverse_sigma) * grid, 2), beta)) kernel = kernel / np.sum(kernel) return kernel def bivariate_plateau(kernel_size, sig_x, sig_y, theta, beta, grid=None, isotropic=True): """Generate a plateau-like anisotropic kernel. 1 / (1+x^(beta)) Reference: https://stats.stackexchange.com/questions/203629/is-there-a-plateau-shaped-distribution In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. Args: kernel_size (int): sig_x (float): sig_y (float): theta (float): Radian measurement. beta (float): shape parameter, beta = 1 is the normal distribution. grid (ndarray, optional): generated by :func:`mesh_grid`, with the shape (K, K, 2), K is the kernel size. Default: None Returns: kernel (ndarray): normalized kernel. """ if grid is None: grid, _, _ = mesh_grid(kernel_size) if isotropic: sigma_matrix = np.array([[sig_x ** 2, 0], [0, sig_x ** 2]]) else: sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) inverse_sigma = np.linalg.inv(sigma_matrix) kernel = np.reciprocal(np.power(np.sum(np.dot(grid, inverse_sigma) * grid, 2), beta) + 1) kernel = kernel / np.sum(kernel) return kernel def random_bivariate_Gaussian(kernel_size, sigma_x_range, sigma_y_range, rotation_range, noise_range=None, isotropic=True): """Randomly generate bivariate isotropic or anisotropic Gaussian kernels. In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. Args: kernel_size (int): sigma_x_range (tuple): [0.6, 5] sigma_y_range (tuple): [0.6, 5] rotation range (tuple): [-math.pi, math.pi] noise_range(tuple, optional): multiplicative kernel noise, [0.75, 1.25]. Default: None Returns: kernel (ndarray): """ assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) if isotropic is False: assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) rotation = np.random.uniform(rotation_range[0], rotation_range[1]) else: sigma_y = sigma_x rotation = 0 kernel = bivariate_Gaussian(kernel_size, sigma_x, sigma_y, rotation, isotropic=isotropic) # add multiplicative noise if noise_range is not None: assert noise_range[0] < noise_range[1], 'Wrong noise range.' noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) kernel = kernel * noise kernel = kernel / np.sum(kernel) return kernel def random_bivariate_generalized_Gaussian(kernel_size, sigma_x_range, sigma_y_range, rotation_range, beta_range, noise_range=None, isotropic=True): """Randomly generate bivariate generalized Gaussian kernels. In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. Args: kernel_size (int): sigma_x_range (tuple): [0.6, 5] sigma_y_range (tuple): [0.6, 5] rotation range (tuple): [-math.pi, math.pi] beta_range (tuple): [0.5, 8] noise_range(tuple, optional): multiplicative kernel noise, [0.75, 1.25]. Default: None Returns: kernel (ndarray): """ assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) if isotropic is False: assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) rotation = np.random.uniform(rotation_range[0], rotation_range[1]) else: sigma_y = sigma_x rotation = 0 # assume beta_range[0] < 1 < beta_range[1] if np.random.uniform() < 0.5: beta = np.random.uniform(beta_range[0], 1) else: beta = np.random.uniform(1, beta_range[1]) kernel = bivariate_generalized_Gaussian(kernel_size, sigma_x, sigma_y, rotation, beta, isotropic=isotropic) # add multiplicative noise if noise_range is not None: assert noise_range[0] < noise_range[1], 'Wrong noise range.' noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) kernel = kernel * noise kernel = kernel / np.sum(kernel) return kernel def random_bivariate_plateau(kernel_size, sigma_x_range, sigma_y_range, rotation_range, beta_range, noise_range=None, isotropic=True): """Randomly generate bivariate plateau kernels. In the isotropic mode, only `sigma_x_range` is used. `sigma_y_range` and `rotation_range` is ignored. Args: kernel_size (int): sigma_x_range (tuple): [0.6, 5] sigma_y_range (tuple): [0.6, 5] rotation range (tuple): [-math.pi/2, math.pi/2] beta_range (tuple): [1, 4] noise_range(tuple, optional): multiplicative kernel noise, [0.75, 1.25]. Default: None Returns: kernel (ndarray): """ assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' assert sigma_x_range[0] < sigma_x_range[1], 'Wrong sigma_x_range.' sigma_x = np.random.uniform(sigma_x_range[0], sigma_x_range[1]) if isotropic is False: assert sigma_y_range[0] < sigma_y_range[1], 'Wrong sigma_y_range.' assert rotation_range[0] < rotation_range[1], 'Wrong rotation_range.' sigma_y = np.random.uniform(sigma_y_range[0], sigma_y_range[1]) rotation = np.random.uniform(rotation_range[0], rotation_range[1]) else: sigma_y = sigma_x rotation = 0 # TODO: this may be not proper if np.random.uniform() < 0.5: beta = np.random.uniform(beta_range[0], 1) else: beta = np.random.uniform(1, beta_range[1]) kernel = bivariate_plateau(kernel_size, sigma_x, sigma_y, rotation, beta, isotropic=isotropic) # add multiplicative noise if noise_range is not None: assert noise_range[0] < noise_range[1], 'Wrong noise range.' noise = np.random.uniform(noise_range[0], noise_range[1], size=kernel.shape) kernel = kernel * noise kernel = kernel / np.sum(kernel) return kernel def random_mixed_kernels(kernel_list, kernel_prob, kernel_size=21, sigma_x_range=(0.6, 5), sigma_y_range=(0.6, 5), rotation_range=(-math.pi, math.pi), betag_range=(0.5, 8), betap_range=(0.5, 8), noise_range=None): """Randomly generate mixed kernels. Args: kernel_list (tuple): a list name of kernel types, support ['iso', 'aniso', 'skew', 'generalized', 'plateau_iso', 'plateau_aniso'] kernel_prob (tuple): corresponding kernel probability for each kernel type kernel_size (int): sigma_x_range (tuple): [0.6, 5] sigma_y_range (tuple): [0.6, 5] rotation range (tuple): [-math.pi, math.pi] beta_range (tuple): [0.5, 8] noise_range(tuple, optional): multiplicative kernel noise, [0.75, 1.25]. Default: None Returns: kernel (ndarray): """ kernel_type = random.choices(kernel_list, kernel_prob)[0] if kernel_type == 'iso': kernel = random_bivariate_Gaussian( kernel_size, sigma_x_range, sigma_y_range, rotation_range, noise_range=noise_range, isotropic=True) elif kernel_type == 'aniso': kernel = random_bivariate_Gaussian( kernel_size, sigma_x_range, sigma_y_range, rotation_range, noise_range=noise_range, isotropic=False) elif kernel_type == 'generalized_iso': kernel = random_bivariate_generalized_Gaussian( kernel_size, sigma_x_range, sigma_y_range, rotation_range, betag_range, noise_range=noise_range, isotropic=True) elif kernel_type == 'generalized_aniso': kernel = random_bivariate_generalized_Gaussian( kernel_size, sigma_x_range, sigma_y_range, rotation_range, betag_range, noise_range=noise_range, isotropic=False) elif kernel_type == 'plateau_iso': kernel = random_bivariate_plateau( kernel_size, sigma_x_range, sigma_y_range, rotation_range, betap_range, noise_range=None, isotropic=True) elif kernel_type == 'plateau_aniso': kernel = random_bivariate_plateau( kernel_size, sigma_x_range, sigma_y_range, rotation_range, betap_range, noise_range=None, isotropic=False) return kernel np.seterr(divide='ignore', invalid='ignore') def circular_lowpass_kernel(cutoff, kernel_size, pad_to=0): """2D sinc filter Reference: https://dsp.stackexchange.com/questions/58301/2-d-circularly-symmetric-low-pass-filter Args: cutoff (float): cutoff frequency in radians (pi is max) kernel_size (int): horizontal and vertical size, must be odd. pad_to (int): pad kernel size to desired size, must be odd or zero. """ assert kernel_size % 2 == 1, 'Kernel size must be an odd number.' kernel = np.fromfunction( lambda x, y: cutoff * special.j1(cutoff * np.sqrt( (x - (kernel_size - 1) / 2) ** 2 + (y - (kernel_size - 1) / 2) ** 2)) / (2 * np.pi * np.sqrt( (x - (kernel_size - 1) / 2) ** 2 + (y - (kernel_size - 1) / 2) ** 2)), [kernel_size, kernel_size]) kernel[(kernel_size - 1) // 2, (kernel_size - 1) // 2] = cutoff ** 2 / (4 * np.pi) kernel = kernel / np.sum(kernel) if pad_to > kernel_size: pad_size = (pad_to - kernel_size) // 2 kernel = np.pad(kernel, ((pad_size, pad_size), (pad_size, pad_size))) return kernel # ------------------------------------------------------------- # # --------------------------- noise --------------------------- # # ------------------------------------------------------------- # # ----------------------- Gaussian Noise ----------------------- # def instantiate_from_config(config: Mapping[str, Any]) -> Any: if not "target" in config: raise KeyError("Expected key `target` to instantiate.") return get_obj_from_str(config["target"])(**config.get("params", dict())) class BaseStorageBackend(metaclass=ABCMeta): """Abstract class of storage backends. All backends need to implement two apis: ``get()`` and ``get_text()``. ``get()`` reads the file as a byte stream and ``get_text()`` reads the file as texts. """ @property def name(self) -> str: return self.__class__.__name__ @abstractmethod def get(self, filepath: str) -> bytes: pass class PetrelBackend(BaseStorageBackend): """Petrel storage backend (for internal use). PetrelBackend supports reading and writing data to multiple clusters. If the file path contains the cluster name, PetrelBackend will read data from specified cluster or write data to it. Otherwise, PetrelBackend will access the default cluster. Args: path_mapping (dict, optional): Path mapping dict from local path to Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in ``filepath`` will be replaced by ``dst``. Default: None. enable_mc (bool, optional): Whether to enable memcached support. Default: True. conf_path (str, optional): Config path of Petrel client. Default: None. `New in version 1.7.1`. Examples: >>> filepath1 = 's3://path/of/file' >>> filepath2 = 'cluster-name:s3://path/of/file' >>> client = PetrelBackend() >>> client.get(filepath1) # get data from default cluster >>> client.get(filepath2) # get data from 'cluster-name' cluster """ def __init__(self, path_mapping: Optional[dict] = None, enable_mc: bool = False, conf_path: str = None): try: from petrel_client import client except ImportError: raise ImportError('Please install petrel_client to enable ' 'PetrelBackend.') self._client = client.Client(conf_path=conf_path, enable_mc=enable_mc) assert isinstance(path_mapping, dict) or path_mapping is None self.path_mapping = path_mapping def _map_path(self, filepath: Union[str, Path]) -> str: """Map ``filepath`` to a string path whose prefix will be replaced by :attr:`self.path_mapping`. Args: filepath (str): Path to be mapped. """ filepath = str(filepath) if self.path_mapping is not None: for k, v in self.path_mapping.items(): filepath = filepath.replace(k, v, 1) return filepath def _format_path(self, filepath: str) -> str: """Convert a ``filepath`` to standard format of petrel oss. If the ``filepath`` is concatenated by ``os.path.join``, in a Windows environment, the ``filepath`` will be the format of 's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the above ``filepath`` will be converted to 's3://bucket_name/image.jpg'. Args: filepath (str): Path to be formatted. """ return re.sub(r'\\+', '/', filepath) def get(self, filepath: Union[str, Path]) -> bytes: """Read data from a given ``filepath`` with 'rb' mode. Args: filepath (str or Path): Path to read data. Returns: bytes: The loaded bytes. """ filepath = self._map_path(filepath) filepath = self._format_path(filepath) value = self._client.Get(filepath) return value class HardDiskBackend(BaseStorageBackend): """Raw hard disks storage backend.""" def get(self, filepath: Union[str, Path]) -> bytes: """Read data from a given ``filepath`` with 'rb' mode. Args: filepath (str or Path): Path to read data. Returns: bytes: Expected bytes object. """ with open(filepath, 'rb') as f: value_buf = f.read() return value_buf def generate_gaussian_noise(img, sigma=10, gray_noise=False): """Generate Gaussian noise. Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. sigma (float): Noise scale (measured in range 255). Default: 10. Returns: (Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], float32. """ if gray_noise: noise = np.float32(np.random.randn(*(img.shape[0:2]))) * sigma / 255. noise = np.expand_dims(noise, axis=2).repeat(3, axis=2) else: noise = np.float32(np.random.randn(*(img.shape))) * sigma / 255. return noise def add_gaussian_noise(img, sigma=10, clip=True, rounds=False, gray_noise=False): """Add Gaussian noise. Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. sigma (float): Noise scale (measured in range 255). Default: 10. Returns: (Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], float32. """ noise = generate_gaussian_noise(img, sigma, gray_noise) out = img + noise if clip and rounds: out = np.clip((out * 255.0).round(), 0, 255) / 255. elif clip: out = np.clip(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out def generate_gaussian_noise_pt(img, sigma=10, gray_noise=0): """Add Gaussian noise (PyTorch version). Args: img (Tensor): Shape (b, c, h, w), range[0, 1], float32. scale (float | Tensor): Noise scale. Default: 1.0. Returns: (Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], float32. """ b, _, h, w = img.size() if not isinstance(sigma, (float, int)): sigma = sigma.view(img.size(0), 1, 1, 1) if isinstance(gray_noise, (float, int)): cal_gray_noise = gray_noise > 0 else: gray_noise = gray_noise.view(b, 1, 1, 1) cal_gray_noise = torch.sum(gray_noise) > 0 if cal_gray_noise: noise_gray = torch.randn(*img.size()[2:4], dtype=img.dtype, device=img.device) * sigma / 255. noise_gray = noise_gray.view(b, 1, h, w) # always calculate color noise noise = torch.randn(*img.size(), dtype=img.dtype, device=img.device) * sigma / 255. if cal_gray_noise: noise = noise * (1 - gray_noise) + noise_gray * gray_noise return noise def add_gaussian_noise_pt(img, sigma=10, gray_noise=0, clip=True, rounds=False): """Add Gaussian noise (PyTorch version). Args: img (Tensor): Shape (b, c, h, w), range[0, 1], float32. scale (float | Tensor): Noise scale. Default: 1.0. Returns: (Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], float32. """ noise = generate_gaussian_noise_pt(img, sigma, gray_noise) out = img + noise if clip and rounds: out = torch.clamp((out * 255.0).round(), 0, 255) / 255. elif clip: out = torch.clamp(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out # ----------------------- Random Gaussian Noise ----------------------- # def random_generate_gaussian_noise(img, sigma_range=(0, 10), gray_prob=0): sigma = np.random.uniform(sigma_range[0], sigma_range[1]) if np.random.uniform() < gray_prob: gray_noise = True else: gray_noise = False return generate_gaussian_noise(img, sigma, gray_noise) def random_add_gaussian_noise(img, sigma_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): noise = random_generate_gaussian_noise(img, sigma_range, gray_prob) out = img + noise if clip and rounds: out = np.clip((out * 255.0).round(), 0, 255) / 255. elif clip: out = np.clip(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out def random_generate_gaussian_noise_pt(img, sigma_range=(0, 10), gray_prob=0): sigma = torch.rand( img.size(0), dtype=img.dtype, device=img.device) * (sigma_range[1] - sigma_range[0]) + sigma_range[0] gray_noise = torch.rand(img.size(0), dtype=img.dtype, device=img.device) gray_noise = (gray_noise < gray_prob).float() return generate_gaussian_noise_pt(img, sigma, gray_noise) def random_add_gaussian_noise_pt(img, sigma_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): noise = random_generate_gaussian_noise_pt(img, sigma_range, gray_prob) out = img + noise if clip and rounds: out = torch.clamp((out * 255.0).round(), 0, 255) / 255. elif clip: out = torch.clamp(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out # ----------------------- Poisson (Shot) Noise ----------------------- # def generate_poisson_noise(img, scale=1.0, gray_noise=False): """Generate poisson noise. Reference: https://github.com/scikit-image/scikit-image/blob/main/skimage/util/noise.py#L37-L219 Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. scale (float): Noise scale. Default: 1.0. gray_noise (bool): Whether generate gray noise. Default: False. Returns: (Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], float32. """ if gray_noise: img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # round and clip image for counting vals correctly img = np.clip((img * 255.0).round(), 0, 255) / 255. vals = len(np.unique(img)) vals = 2 ** np.ceil(np.log2(vals)) out = np.float32(np.random.poisson(img * vals) / float(vals)) noise = out - img if gray_noise: noise = np.repeat(noise[:, :, np.newaxis], 3, axis=2) return noise * scale def add_poisson_noise(img, scale=1.0, clip=True, rounds=False, gray_noise=False): """Add poisson noise. Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. scale (float): Noise scale. Default: 1.0. gray_noise (bool): Whether generate gray noise. Default: False. Returns: (Numpy array): Returned noisy image, shape (h, w, c), range[0, 1], float32. """ noise = generate_poisson_noise(img, scale, gray_noise) out = img + noise if clip and rounds: out = np.clip((out * 255.0).round(), 0, 255) / 255. elif clip: out = np.clip(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out def generate_poisson_noise_pt(img, scale=1.0, gray_noise=0): """Generate a batch of poisson noise (PyTorch version) Args: img (Tensor): Input image, shape (b, c, h, w), range [0, 1], float32. scale (float | Tensor): Noise scale. Number or Tensor with shape (b). Default: 1.0. gray_noise (float | Tensor): 0-1 number or Tensor with shape (b). 0 for False, 1 for True. Default: 0. Returns: (Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], float32. """ b, _, h, w = img.size() if isinstance(gray_noise, (float, int)): cal_gray_noise = gray_noise > 0 else: gray_noise = gray_noise.view(b, 1, 1, 1) cal_gray_noise = torch.sum(gray_noise) > 0 if cal_gray_noise: img_gray = rgb_to_grayscale(img, num_output_channels=1) # round and clip image for counting vals correctly img_gray = torch.clamp((img_gray * 255.0).round(), 0, 255) / 255. # use for-loop to get the unique values for each sample vals_list = [len(torch.unique(img_gray[i, :, :, :])) for i in range(b)] vals_list = [2 ** np.ceil(np.log2(vals)) for vals in vals_list] vals = img_gray.new_tensor(vals_list).view(b, 1, 1, 1) out = torch.poisson(img_gray * vals) / vals noise_gray = out - img_gray noise_gray = noise_gray.expand(b, 3, h, w) # always calculate color noise # round and clip image for counting vals correctly img = torch.clamp((img * 255.0).round(), 0, 255) / 255. # use for-loop to get the unique values for each sample vals_list = [len(torch.unique(img[i, :, :, :])) for i in range(b)] vals_list = [2 ** np.ceil(np.log2(vals)) for vals in vals_list] vals = img.new_tensor(vals_list).view(b, 1, 1, 1) out = torch.poisson(img * vals) / vals noise = out - img if cal_gray_noise: noise = noise * (1 - gray_noise) + noise_gray * gray_noise if not isinstance(scale, (float, int)): scale = scale.view(b, 1, 1, 1) return noise * scale def add_poisson_noise_pt(img, scale=1.0, clip=True, rounds=False, gray_noise=0): """Add poisson noise to a batch of images (PyTorch version). Args: img (Tensor): Input image, shape (b, c, h, w), range [0, 1], float32. scale (float | Tensor): Noise scale. Number or Tensor with shape (b). Default: 1.0. gray_noise (float | Tensor): 0-1 number or Tensor with shape (b). 0 for False, 1 for True. Default: 0. Returns: (Tensor): Returned noisy image, shape (b, c, h, w), range[0, 1], float32. """ noise = generate_poisson_noise_pt(img, scale, gray_noise) out = img + noise if clip and rounds: out = torch.clamp((out * 255.0).round(), 0, 255) / 255. elif clip: out = torch.clamp(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out # ----------------------- Random Poisson (Shot) Noise ----------------------- # def random_generate_poisson_noise(img, scale_range=(0, 1.0), gray_prob=0): scale = np.random.uniform(scale_range[0], scale_range[1]) if np.random.uniform() < gray_prob: gray_noise = True else: gray_noise = False return generate_poisson_noise(img, scale, gray_noise) def random_add_poisson_noise(img, scale_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): noise = random_generate_poisson_noise(img, scale_range, gray_prob) out = img + noise if clip and rounds: out = np.clip((out * 255.0).round(), 0, 255) / 255. elif clip: out = np.clip(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out def random_generate_poisson_noise_pt(img, scale_range=(0, 1.0), gray_prob=0): scale = torch.rand( img.size(0), dtype=img.dtype, device=img.device) * (scale_range[1] - scale_range[0]) + scale_range[0] gray_noise = torch.rand(img.size(0), dtype=img.dtype, device=img.device) gray_noise = (gray_noise < gray_prob).float() return generate_poisson_noise_pt(img, scale, gray_noise) def random_add_poisson_noise_pt(img, scale_range=(0, 1.0), gray_prob=0, clip=True, rounds=False): noise = random_generate_poisson_noise_pt(img, scale_range, gray_prob) out = img + noise if clip and rounds: out = torch.clamp((out * 255.0).round(), 0, 255) / 255. elif clip: out = torch.clamp(out, 0, 1) elif rounds: out = (out * 255.0).round() / 255. return out # ------------------------------------------------------------------------ # # --------------------------- JPEG compression --------------------------- # # ------------------------------------------------------------------------ # def add_jpg_compression(img, quality=90): """Add JPG compression artifacts. Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. quality (float): JPG compression quality. 0 for lowest quality, 100 for best quality. Default: 90. Returns: (Numpy array): Returned image after JPG, shape (h, w, c), range[0, 1], float32. """ img = np.clip(img, 0, 1) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] _, encimg = cv2.imencode('.jpg', img * 255., encode_param) img = np.float32(cv2.imdecode(encimg, 1)) / 255. return img def random_add_jpg_compression(img, quality_range=(90, 100)): """Randomly add JPG compression artifacts. Args: img (Numpy array): Input image, shape (h, w, c), range [0, 1], float32. quality_range (tuple[float] | list[float]): JPG compression quality range. 0 for lowest quality, 100 for best quality. Default: (90, 100). Returns: (Numpy array): Returned image after JPG, shape (h, w, c), range[0, 1], float32. """ quality = np.random.uniform(quality_range[0], quality_range[1]) return add_jpg_compression(img, int(quality)) def load_file_list(file_list_path: str) -> List[Dict[str, str]]: files = [] with open(file_list_path, "r") as fin: for line in fin: path = line.strip() if path: files.append({"image_path": path, "prompt": ""}) return files # https://github.com/openai/guided-diffusion/blob/main/guided_diffusion/image_datasets.py def center_crop_arr(pil_image, image_size): # We are not on a new enough PIL to support the `reducing_gap` # argument, which uses BOX downsampling at powers of two first. # Thus, we do it by hand to improve downsample quality. while min(*pil_image.size) >= 2 * image_size: pil_image = pil_image.resize( tuple(x // 2 for x in pil_image.size), resample=Image.BOX ) scale = image_size / min(*pil_image.size) pil_image = pil_image.resize( tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC ) arr = np.array(pil_image) crop_y = (arr.shape[0] - image_size) // 2 crop_x = (arr.shape[1] - image_size) // 2 return arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size] # https://github.com/openai/guided-diffusion/blob/main/guided_diffusion/image_datasets.py def random_crop_arr(pil_image, image_size, min_crop_frac=0.8, max_crop_frac=1.0): min_smaller_dim_size = math.ceil(image_size / max_crop_frac) max_smaller_dim_size = math.ceil(image_size / min_crop_frac) smaller_dim_size = random.randrange(min_smaller_dim_size, max_smaller_dim_size + 1) # We are not on a new enough PIL to support the `reducing_gap` # argument, which uses BOX downsampling at powers of two first. # Thus, we do it by hand to improve downsample quality. while min(*pil_image.size) >= 2 * smaller_dim_size: pil_image = pil_image.resize( tuple(x // 2 for x in pil_image.size), resample=Image.BOX ) scale = smaller_dim_size / min(*pil_image.size) pil_image = pil_image.resize( tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC ) arr = np.array(pil_image) crop_y = random.randrange(arr.shape[0] - image_size + 1) crop_x = random.randrange(arr.shape[1] - image_size + 1) return arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size]