Spaces:
Runtime error
Runtime error
""" | |
https://github.com/Trinkle23897/Fast-Poisson-Image-Editing | |
MIT License | |
Copyright (c) 2022 Jiayi Weng | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
import os | |
from abc import ABC, abstractmethod | |
from typing import Any, Optional, Tuple | |
import numpy as np | |
from fpie import np_solver | |
import scipy | |
import scipy.signal | |
CPU_COUNT = os.cpu_count() or 1 | |
DEFAULT_BACKEND = "numpy" | |
ALL_BACKEND = ["numpy"] | |
try: | |
from fpie import numba_solver | |
ALL_BACKEND += ["numba"] | |
DEFAULT_BACKEND = "numba" | |
except ImportError: | |
numba_solver = None # type: ignore | |
try: | |
from fpie import taichi_solver | |
ALL_BACKEND += ["taichi-cpu", "taichi-gpu"] | |
DEFAULT_BACKEND = "taichi-cpu" | |
except ImportError: | |
taichi_solver = None # type: ignore | |
# try: | |
# from fpie import core_gcc # type: ignore | |
# DEFAULT_BACKEND = "gcc" | |
# ALL_BACKEND.append("gcc") | |
# except ImportError: | |
# core_gcc = None | |
# try: | |
# from fpie import core_openmp # type: ignore | |
# DEFAULT_BACKEND = "openmp" | |
# ALL_BACKEND.append("openmp") | |
# except ImportError: | |
# core_openmp = None | |
# try: | |
# from mpi4py import MPI | |
# from fpie import core_mpi # type: ignore | |
# ALL_BACKEND.append("mpi") | |
# except ImportError: | |
# MPI = None # type: ignore | |
# core_mpi = None | |
try: | |
from fpie import core_cuda # type: ignore | |
DEFAULT_BACKEND = "cuda" | |
ALL_BACKEND.append("cuda") | |
except ImportError: | |
core_cuda = None | |
class BaseProcessor(ABC): | |
"""API definition for processor class.""" | |
def __init__( | |
self, gradient: str, rank: int, backend: str, core: Optional[Any] | |
): | |
if core is None: | |
error_msg = { | |
"numpy": | |
"Please run `pip install numpy`.", | |
"numba": | |
"Please run `pip install numba`.", | |
"gcc": | |
"Please install cmake and gcc in your operating system.", | |
"openmp": | |
"Please make sure your gcc is compatible with `-fopenmp` option.", | |
"mpi": | |
"Please install MPI and run `pip install mpi4py`.", | |
"cuda": | |
"Please make sure nvcc and cuda-related libraries are available.", | |
"taichi": | |
"Please run `pip install taichi`.", | |
} | |
print(error_msg[backend.split("-")[0]]) | |
raise AssertionError(f"Invalid backend {backend}.") | |
self.gradient = gradient | |
self.rank = rank | |
self.backend = backend | |
self.core = core | |
self.root = rank == 0 | |
def mixgrad(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
if self.gradient == "src": | |
return a | |
if self.gradient == "avg": | |
return (a + b) / 2 | |
# mix gradient, see Equ. 12 in PIE paper | |
mask = np.abs(a) < np.abs(b) | |
a[mask] = b[mask] | |
return a | |
def reset( | |
self, | |
src: np.ndarray, | |
mask: np.ndarray, | |
tgt: np.ndarray, | |
mask_on_src: Tuple[int, int], | |
mask_on_tgt: Tuple[int, int], | |
) -> int: | |
pass | |
def sync(self) -> None: | |
self.core.sync() | |
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]: | |
pass | |
class EquProcessor(BaseProcessor): | |
"""PIE Jacobi equation processor.""" | |
def __init__( | |
self, | |
gradient: str = "max", | |
backend: str = DEFAULT_BACKEND, | |
n_cpu: int = CPU_COUNT, | |
min_interval: int = 100, | |
block_size: int = 1024, | |
): | |
core: Optional[Any] = None | |
rank = 0 | |
if backend == "numpy": | |
core = np_solver.EquSolver() | |
elif backend == "numba" and numba_solver is not None: | |
core = numba_solver.EquSolver() | |
elif backend == "gcc": | |
core = core_gcc.EquSolver() | |
elif backend == "openmp" and core_openmp is not None: | |
core = core_openmp.EquSolver(n_cpu) | |
elif backend == "mpi" and core_mpi is not None: | |
core = core_mpi.EquSolver(min_interval) | |
rank = MPI.COMM_WORLD.Get_rank() | |
elif backend == "cuda" and core_cuda is not None: | |
core = core_cuda.EquSolver(block_size) | |
elif backend.startswith("taichi") and taichi_solver is not None: | |
core = taichi_solver.EquSolver(backend, n_cpu, block_size) | |
super().__init__(gradient, rank, backend, core) | |
def mask2index( | |
self, mask: np.ndarray | |
) -> Tuple[np.ndarray, int, np.ndarray, np.ndarray]: | |
x, y = np.nonzero(mask) | |
max_id = x.shape[0] + 1 | |
index = np.zeros((max_id, 3)) | |
ids = self.core.partition(mask) | |
ids[mask == 0] = 0 # reserve id=0 for constant | |
index = ids[x, y].argsort() | |
return ids, max_id, x[index], y[index] | |
def reset( | |
self, | |
src: np.ndarray, | |
mask: np.ndarray, | |
tgt: np.ndarray, | |
mask_on_src: Tuple[int, int], | |
mask_on_tgt: Tuple[int, int], | |
) -> int: | |
assert self.root | |
# check validity | |
# assert 0 <= mask_on_src[0] and 0 <= mask_on_src[1] | |
# assert mask_on_src[0] + mask.shape[0] <= src.shape[0] | |
# assert mask_on_src[1] + mask.shape[1] <= src.shape[1] | |
# assert mask_on_tgt[0] + mask.shape[0] <= tgt.shape[0] | |
# assert mask_on_tgt[1] + mask.shape[1] <= tgt.shape[1] | |
if len(mask.shape) == 3: | |
mask = mask.mean(-1) | |
mask = (mask >= 128).astype(np.int32) | |
# zero-out edge | |
mask[0] = 0 | |
mask[-1] = 0 | |
mask[:, 0] = 0 | |
mask[:, -1] = 0 | |
x, y = np.nonzero(mask) | |
x0, x1 = x.min() - 1, x.max() + 2 | |
y0, y1 = y.min() - 1, y.max() + 2 | |
mask_on_src = (x0 + mask_on_src[0], y0 + mask_on_src[1]) | |
mask_on_tgt = (x0 + mask_on_tgt[0], y0 + mask_on_tgt[1]) | |
mask = mask[x0:x1, y0:y1] | |
ids, max_id, index_x, index_y = self.mask2index(mask) | |
src_x, src_y = index_x + mask_on_src[0], index_y + mask_on_src[1] | |
tgt_x, tgt_y = index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] | |
src_C = src[src_x, src_y].astype(np.float32) | |
src_U = src[src_x - 1, src_y].astype(np.float32) | |
src_D = src[src_x + 1, src_y].astype(np.float32) | |
src_L = src[src_x, src_y - 1].astype(np.float32) | |
src_R = src[src_x, src_y + 1].astype(np.float32) | |
tgt_C = tgt[tgt_x, tgt_y].astype(np.float32) | |
tgt_U = tgt[tgt_x - 1, tgt_y].astype(np.float32) | |
tgt_D = tgt[tgt_x + 1, tgt_y].astype(np.float32) | |
tgt_L = tgt[tgt_x, tgt_y - 1].astype(np.float32) | |
tgt_R = tgt[tgt_x, tgt_y + 1].astype(np.float32) | |
grad = self.mixgrad(src_C - src_L, tgt_C - tgt_L) \ | |
+ self.mixgrad(src_C - src_R, tgt_C - tgt_R) \ | |
+ self.mixgrad(src_C - src_U, tgt_C - tgt_U) \ | |
+ self.mixgrad(src_C - src_D, tgt_C - tgt_D) | |
A = np.zeros((max_id, 4), np.int32) | |
X = np.zeros((max_id, 3), np.float32) | |
B = np.zeros((max_id, 3), np.float32) | |
X[1:] = tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1]] | |
# four-way | |
A[1:, 0] = ids[index_x - 1, index_y] | |
A[1:, 1] = ids[index_x + 1, index_y] | |
A[1:, 2] = ids[index_x, index_y - 1] | |
A[1:, 3] = ids[index_x, index_y + 1] | |
B[1:] = grad | |
m = (mask[index_x - 1, index_y] == 0).astype(float).reshape(-1, 1) | |
B[1:] += m * tgt[index_x + mask_on_tgt[0] - 1, index_y + mask_on_tgt[1]] | |
m = (mask[index_x, index_y - 1] == 0).astype(float).reshape(-1, 1) | |
B[1:] += m * tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] - 1] | |
m = (mask[index_x, index_y + 1] == 0).astype(float).reshape(-1, 1) | |
B[1:] += m * tgt[index_x + mask_on_tgt[0], index_y + mask_on_tgt[1] + 1] | |
m = (mask[index_x + 1, index_y] == 0).astype(float).reshape(-1, 1) | |
B[1:] += m * tgt[index_x + mask_on_tgt[0] + 1, index_y + mask_on_tgt[1]] | |
self.tgt = tgt.copy() | |
self.tgt_index = (index_x + mask_on_tgt[0], index_y + mask_on_tgt[1]) | |
self.core.reset(max_id, A, X, B) | |
return max_id | |
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]: | |
result = self.core.step(iteration) | |
if self.root: | |
x, err = result | |
self.tgt[self.tgt_index] = x[1:] | |
return self.tgt, err | |
return None | |
class GridProcessor(BaseProcessor): | |
"""PIE grid processor.""" | |
def __init__( | |
self, | |
gradient: str = "max", | |
backend: str = DEFAULT_BACKEND, | |
n_cpu: int = CPU_COUNT, | |
min_interval: int = 100, | |
block_size: int = 1024, | |
grid_x: int = 8, | |
grid_y: int = 8, | |
): | |
core: Optional[Any] = None | |
rank = 0 | |
if backend == "numpy": | |
core = np_solver.GridSolver() | |
elif backend == "numba" and numba_solver is not None: | |
core = numba_solver.GridSolver() | |
elif backend == "gcc": | |
core = core_gcc.GridSolver(grid_x, grid_y) | |
elif backend == "openmp" and core_openmp is not None: | |
core = core_openmp.GridSolver(grid_x, grid_y, n_cpu) | |
elif backend == "mpi" and core_mpi is not None: | |
core = core_mpi.GridSolver(min_interval) | |
rank = MPI.COMM_WORLD.Get_rank() | |
elif backend == "cuda" and core_cuda is not None: | |
core = core_cuda.GridSolver(grid_x, grid_y) | |
elif backend.startswith("taichi") and taichi_solver is not None: | |
core = taichi_solver.GridSolver( | |
grid_x, grid_y, backend, n_cpu, block_size | |
) | |
super().__init__(gradient, rank, backend, core) | |
def reset( | |
self, | |
src: np.ndarray, | |
mask: np.ndarray, | |
tgt: np.ndarray, | |
mask_on_src: Tuple[int, int], | |
mask_on_tgt: Tuple[int, int], | |
) -> int: | |
assert self.root | |
# check validity | |
# assert 0 <= mask_on_src[0] and 0 <= mask_on_src[1] | |
# assert mask_on_src[0] + mask.shape[0] <= src.shape[0] | |
# assert mask_on_src[1] + mask.shape[1] <= src.shape[1] | |
# assert mask_on_tgt[0] + mask.shape[0] <= tgt.shape[0] | |
# assert mask_on_tgt[1] + mask.shape[1] <= tgt.shape[1] | |
if len(mask.shape) == 3: | |
mask = mask.mean(-1) | |
mask = (mask >= 128).astype(np.int32) | |
# zero-out edge | |
mask[0] = 0 | |
mask[-1] = 0 | |
mask[:, 0] = 0 | |
mask[:, -1] = 0 | |
x, y = np.nonzero(mask) | |
x0, x1 = x.min() - 1, x.max() + 2 | |
y0, y1 = y.min() - 1, y.max() + 2 | |
mask = mask[x0:x1, y0:y1] | |
max_id = np.prod(mask.shape) | |
src_crop = src[mask_on_src[0] + x0:mask_on_src[0] + x1, | |
mask_on_src[1] + y0:mask_on_src[1] + y1].astype(np.float32) | |
tgt_crop = tgt[mask_on_tgt[0] + x0:mask_on_tgt[0] + x1, | |
mask_on_tgt[1] + y0:mask_on_tgt[1] + y1].astype(np.float32) | |
grad = np.zeros([*mask.shape, 3], np.float32) | |
grad[1:] += self.mixgrad( | |
src_crop[1:] - src_crop[:-1], tgt_crop[1:] - tgt_crop[:-1] | |
) | |
grad[:-1] += self.mixgrad( | |
src_crop[:-1] - src_crop[1:], tgt_crop[:-1] - tgt_crop[1:] | |
) | |
grad[:, 1:] += self.mixgrad( | |
src_crop[:, 1:] - src_crop[:, :-1], tgt_crop[:, 1:] - tgt_crop[:, :-1] | |
) | |
grad[:, :-1] += self.mixgrad( | |
src_crop[:, :-1] - src_crop[:, 1:], tgt_crop[:, :-1] - tgt_crop[:, 1:] | |
) | |
grad[mask == 0] = 0 | |
if True: | |
kernel = [[1] * 3 for _ in range(3)] | |
nmask = mask.copy() | |
nmask[nmask > 0] = 1 | |
res = scipy.signal.convolve2d( | |
nmask, kernel, mode="same", boundary="fill", fillvalue=1 | |
) | |
res[nmask < 1] = 0 | |
res[res == 9] = 0 | |
res[res > 0] = 1 | |
grad[res>0]=0 | |
# ylst, xlst = res.nonzero() | |
# for y, x in zip(ylst, xlst): | |
# grad[y,x]=0 | |
# for yi in range(-1,2): | |
# for xi in range(-1,2): | |
# grad[y+yi,x+xi]=0 | |
self.x0 = mask_on_tgt[0] + x0 | |
self.x1 = mask_on_tgt[0] + x1 | |
self.y0 = mask_on_tgt[1] + y0 | |
self.y1 = mask_on_tgt[1] + y1 | |
self.tgt = tgt.copy() | |
self.core.reset(max_id, mask, tgt_crop, grad) | |
return max_id | |
def step(self, iteration: int) -> Optional[Tuple[np.ndarray, np.ndarray]]: | |
result = self.core.step(iteration) | |
if self.root: | |
tgt, err = result | |
self.tgt[self.x0:self.x1, self.y0:self.y1] = tgt | |
return self.tgt, err | |
return None | |