Source code for deepxde.data.fpde

__all__ = ["Scheme", "FPDE", "TimeFPDE"]

import math

import numpy as np

from .pde import PDE
from .. import backend as bkd
from .. import config
from ..backend import is_tensor, backend_name
from ..utils import array_ops_compat, run_if_all_none


[docs] class Scheme: """Fractional Laplacian discretization. Discretize fractional Laplacian uisng quadrature rule for the integral with respect to the directions and Grunwald-Letnikov (GL) formula for the Riemann-Liouville directional fractional derivative. Args: meshtype (string): "static" or "dynamic". resolution: A list of integer. The first number is the number of quadrature points in the first direction, ..., and the last number is the GL parameter. References: `G. Pang, L. Lu, & G. E. Karniadakis. fPINNs: Fractional physics-informed neural networks. SIAM Journal on Scientific Computing, 41(4), A2603--A2626, 2019 <https://doi.org/10.1137/18M1229845>`_. """ def __init__(self, meshtype, resolution): self.meshtype = meshtype self.resolution = resolution self.dim = len(resolution) self._check() def _check(self): if self.meshtype not in ["static", "dynamic"]: raise ValueError("Wrong meshtype %s" % self.meshtype) if self.dim >= 2 and self.meshtype == "static": raise ValueError( "Do not support meshtype static for dimension %d" % self.dim )
[docs] class FPDE(PDE): r"""Fractional PDE solver. D-dimensional fractional Laplacian of order alpha/2 (1 < alpha < 2) is defined as: (-Delta)^(alpha/2) u(x) = C(alpha, D) \int_{||theta||=1} D_theta^alpha u(x) d theta, where C(alpha, D) = gamma((1-alpha)/2) * gamma((D+alpha)/2) / (2 pi^((D+1)/2)), D_theta^alpha is the Riemann-Liouville directional fractional derivative, and theta is the differentiation direction vector. The solution u(x) is assumed to be identically zero in the boundary and exterior of the domain. When D = 1, C(alpha, D) = 1 / (2 cos(alpha * pi / 2)). This solver does not consider C(alpha, D) in the fractional Laplacian, and only discretizes \int_{||theta||=1} D_theta^alpha u(x) d theta. D_theta^alpha is approximated by Grunwald-Letnikov formula. References: `G. Pang, L. Lu, & G. E. Karniadakis. fPINNs: Fractional physics-informed neural networks. SIAM Journal on Scientific Computing, 41(4), A2603--A2626, 2019 <https://doi.org/10.1137/18M1229845>`_. """ def __init__( self, geometry, fpde, alpha, bcs, resolution, meshtype="dynamic", num_domain=0, num_boundary=0, train_distribution="Hammersley", anchors=None, solution=None, num_test=None, ): self.alpha = alpha self.disc = Scheme(meshtype, resolution) self.frac_train, self.frac_test = None, None self.int_mat_train = None super().__init__( geometry, fpde, bcs, num_domain=num_domain, num_boundary=num_boundary, train_distribution=train_distribution, anchors=anchors, solution=solution, num_test=num_test, )
[docs] def losses_train(self, targets, outputs, loss_fn, inputs, model, aux=None): bcs_start = np.cumsum([0] + self.num_bcs) # do not cache int_mat when alpha is a learnable parameter if is_tensor(self.alpha): int_mat = self.get_int_matrix(True) else: if self.int_mat_train is not None: # use cached int_mat int_mat = self.int_mat_train else: # initialize self.int_mat_train with int_mat int_mat = self.get_int_matrix(True) self.int_mat_train = int_mat f = self.pde(inputs, outputs, int_mat) if not isinstance(f, (list, tuple)): f = [f] f = [fi[bcs_start[-1] :] for fi in f] losses = [ loss_fn(bkd.zeros_like(fi), fi) for fi in f ] for i, bc in enumerate(self.bcs): beg, end = bcs_start[i], bcs_start[i + 1] error = bc.error(self.train_x, inputs, outputs, beg, end) losses.append( loss_fn(bkd.zeros_like(error), error) ) return losses
[docs] def losses_test(self, targets, outputs, loss_fn, inputs, model, aux=None): int_mat = self.get_int_matrix(False) f = self.pde(inputs, outputs, int_mat) if not isinstance(f, (list, tuple)): f = [f] return [ loss_fn(bkd.zeros_like(fi), fi) for fi in f ] + [bkd.as_tensor(0, dtype=config.real(bkd.lib)) for _ in self.bcs]
[docs] def train_next_batch(self, batch_size=None): # do not cache train data when alpha is a learnable parameter if not is_tensor(self.alpha) or backend_name == "tensorflow.compat.v1": if self.train_x is not None: return self.train_x, self.train_y if self.disc.meshtype == "static": if self.geom.idstr != "Interval": raise ValueError("Only Interval supports static mesh.") self.frac_train = Fractional(self.alpha, self.geom, self.disc, None) X = self.frac_train.get_x() # FPDE is only applied to the domain points. # Boundary points are auxiliary points, and appended in the end. X = np.roll(X, -1) self.train_x_all = X if self.anchors is not None: self.train_x_all = np.vstack((self.anchors, self.train_x_all)) x_bc = self.bc_points() elif self.disc.meshtype == "dynamic": self.train_x_all = self.train_points() x_bc = self.bc_points() # FPDE is only applied to the domain points. x_f = self.train_x_all[~self.geom.on_boundary(self.train_x_all)] self.frac_train = Fractional(self.alpha, self.geom, self.disc, x_f) X = self.frac_train.get_x() self.train_x = np.vstack((x_bc, X)) self.train_y = self.soln(self.train_x) if self.soln else None return self.train_x, self.train_y
[docs] def test(self): # do not cache test data when alpha is a learnable parameter if not is_tensor(self.alpha) or backend_name == "tensorflow.compat.v1": if self.test_x is not None: return self.test_x, self.test_y if self.disc.meshtype == "static" and self.num_test is not None: raise ValueError("Cannot use test points in static mesh.") if self.num_test is None: self.test_x = self.train_x[sum(self.num_bcs) :] self.frac_test = self.frac_train else: self.test_x = self.test_points() x_f = self.test_x[~self.geom.on_boundary(self.test_x)] self.frac_test = Fractional(self.alpha, self.geom, self.disc, x_f) self.test_x = self.frac_test.get_x() self.test_y = self.soln(self.test_x) if self.soln else None return self.test_x, self.test_y
[docs] def test_points(self): return self.geom.uniform_points(self.num_test, True)
[docs] def get_int_matrix(self, training): if training: int_mat = self.frac_train.get_matrix(sparse=True) num_bc = sum(self.num_bcs) else: int_mat = self.frac_test.get_matrix(sparse=True) num_bc = 0 if self.disc.meshtype == "static": int_mat = array_ops_compat.roll(int_mat, -1, 1) int_mat = int_mat[1:-1] int_mat = array_ops_compat.zero_padding(int_mat, ((num_bc, 0), (num_bc, 0))) return int_mat
[docs] class TimeFPDE(FPDE): r"""Time-dependent fractional PDE solver. D-dimensional fractional Laplacian of order alpha/2 (1 < alpha < 2) is defined as: (-Delta)^(alpha/2) u(x) = C(alpha, D) \int_{||theta||=1} D_theta^alpha u(x) d theta, where C(alpha, D) = gamma((1-alpha)/2) * gamma((D+alpha)/2) / (2 pi^((D+1)/2)), D_theta^alpha is the Riemann-Liouville directional fractional derivative, and theta is the differentiation direction vector. The solution u(x) is assumed to be identically zero in the boundary and exterior of the domain. When D = 1, C(alpha, D) = 1 / (2 cos(alpha * pi / 2)). This solver does not consider C(alpha, D) in the fractional Laplacian, and only discretizes \int_{||theta||=1} D_theta^alpha u(x) d theta. D_theta^alpha is approximated by Grunwald-Letnikov formula. References: `G. Pang, L. Lu, & G. E. Karniadakis. fPINNs: Fractional physics-informed neural networks. SIAM Journal on Scientific Computing, 41(4), A2603--A2626, 2019 <https://doi.org/10.1137/18M1229845>`_. """ def __init__( self, geometryxtime, fpde, alpha, ic_bcs, resolution, meshtype="dynamic", num_domain=0, num_boundary=0, num_initial=0, train_distribution="Hammersley", anchors=None, solution=None, num_test=None, ): self.num_initial = num_initial super().__init__( geometryxtime, fpde, alpha, ic_bcs, resolution, meshtype=meshtype, num_domain=num_domain, num_boundary=num_boundary, train_distribution=train_distribution, anchors=anchors, solution=solution, num_test=num_test, )
[docs] @run_if_all_none("train_x", "train_y") def train_next_batch(self, batch_size=None): if self.disc.meshtype == "static": if self.geom.geometry.idstr != "Interval": raise ValueError("Only Interval supports static mesh.") nt = int(round(self.num_domain / (self.disc.resolution[0] - 2))) + 1 self.frac_train = FractionalTime( self.alpha, self.geom.geometry, self.geom.timedomain.t0, self.geom.timedomain.t1, self.disc, nt, None, ) X = self.frac_train.get_x() self.train_x_all = X if self.anchors is not None: self.train_x_all = np.vstack((self.anchors, self.train_x_all)) x_bc = self.bc_points() # Remove the initial and boundary points at the beginning of X, # which are not considered in the integral matrix. X = X[self.disc.resolution[0] + 2 * nt - 2 :, :] elif self.disc.meshtype == "dynamic": self.train_x_all = self.train_points() x_bc = self.bc_points() # FPDE is only applied to the non-boundary points. x_f = self.train_x_all[~self.geom.on_boundary(self.train_x_all)] self.frac_train = FractionalTime( self.alpha, self.geom.geometry, self.geom.timedomain.t0, self.geom.timedomain.t1, self.disc, None, x_f, ) X = self.frac_train.get_x() self.train_x = np.vstack((x_bc, X)) self.train_y = self.soln(self.train_x) if self.soln else None return self.train_x, self.train_y
[docs] @run_if_all_none("test_x", "test_y") def test(self): if self.disc.meshtype == "static" and self.num_test is not None: raise ValueError("Cannot use test points in static mesh.") if self.num_test is None: self.test_x = self.train_x[sum(self.num_bcs) :] self.frac_test = self.frac_train else: self.test_x = self.test_points() x_f = self.test_x[~self.geom.on_boundary(self.test_x)] self.frac_test = FractionalTime( self.alpha, self.geom.geometry, self.geom.timedomain.t0, self.geom.timedomain.t1, self.disc, None, x_f, ) self.test_x = self.frac_test.get_x() self.test_y = self.soln(self.test_x) if self.soln else None return self.test_x, self.test_y
[docs] def train_points(self): X = super().train_points() if self.num_initial > 0: if self.train_distribution == "uniform": tmp = self.geom.uniform_initial_points(self.num_initial) else: tmp = self.geom.random_initial_points( self.num_initial, random=self.train_distribution ) X = np.vstack((tmp, X)) return X
[docs] def get_int_matrix(self, training): if training: int_mat = self.frac_train.get_matrix(sparse=True) num_bc = sum(self.num_bcs) else: int_mat = self.frac_test.get_matrix(sparse=True) num_bc = 0 int_mat = array_ops_compat.zero_padding(int_mat, ((num_bc, 0), (num_bc, 0))) return int_mat
class Fractional: """Fractional derivative. Args: x0: If ``disc.meshtype = static``, then x0 should be None; if ``disc.meshtype = 'dynamic'``, then x0 are non-boundary points. """ def __init__(self, alpha, geom, disc, x0): if (disc.meshtype == "static" and x0 is not None) or ( disc.meshtype == "dynamic" and x0 is None ): raise ValueError("disc.meshtype and x0 do not match.") self.alpha, self.geom = alpha, geom self.disc, self.x0 = disc, x0 if disc.meshtype == "dynamic": self._check_dynamic_stepsize() self.x, self.xindex_start, self.w = None, None, None self._w_init = self._init_weights() def _check_dynamic_stepsize(self): h = 1 / self.disc.resolution[-1] min_h = self.geom.mindist2boundary(self.x0) if min_h < h: print( "Warning: mesh step size %f is larger than the boundary distance %f." % (h, min_h) ) def _init_weights(self): """If ``disc.meshtype = 'static'``, then n is number of points; if ``disc.meshtype = 'dynamic'``, then n is resolution lambda. """ n = ( self.disc.resolution[0] if self.disc.meshtype == "static" else self.dynamic_dist2npts(self.geom.diam) + 1 ) w = [bkd.as_tensor(1.0, dtype=config.real(bkd.lib)) if bkd.is_tensor(self.alpha) else 1.0] for j in range(1, n): w.append(w[-1] * (j - 1 - self.alpha) / j) return array_ops_compat.convert_to_array(w) def get_x(self): self.x = ( self.get_x_static() if self.disc.meshtype == "static" else self.get_x_dynamic() ) return self.x def get_matrix(self, sparse=False): return ( self.get_matrix_static() if self.disc.meshtype == "static" else self.get_matrix_dynamic(sparse) ) def get_x_static(self): return self.geom.uniform_points(self.disc.resolution[0], True) def dynamic_dist2npts(self, dx): return int(math.ceil(self.disc.resolution[-1] * dx)) def get_x_dynamic(self): if np.any(self.geom.on_boundary(self.x0)): raise ValueError("x0 contains boundary points.") if self.geom.dim == 1: dirns, dirn_w = [-1, 1], [1, 1] elif self.geom.dim == 2: gauss_x, gauss_w = np.polynomial.legendre.leggauss(self.disc.resolution[0]) gauss_x, gauss_w = gauss_x.astype(config.real(np)), gauss_w.astype(config.real(np)) thetas = np.pi * gauss_x + np.pi dirns = np.vstack((np.cos(thetas), np.sin(thetas))).T dirn_w = np.pi * gauss_w elif self.geom.dim == 3: gauss_x, gauss_w = np.polynomial.legendre.leggauss( max(self.disc.resolution[:2]) ) gauss_x, gauss_w = gauss_x.astype(config.real(np)), gauss_w.astype(config.real(np)) thetas = (np.pi * gauss_x[: self.disc.resolution[0]] + np.pi) / 2 phis = np.pi * gauss_x[: self.disc.resolution[1]] + np.pi dirns, dirn_w = [], [] for i in range(self.disc.resolution[0]): for j in range(self.disc.resolution[1]): dirns.append( [ np.sin(thetas[i]) * np.cos(phis[j]), np.sin(thetas[i]) * np.sin(phis[j]), np.cos(thetas[i]), ] ) dirn_w.append(gauss_w[i] * gauss_w[j] * np.sin(thetas[i])) dirn_w = np.pi ** 2 / 2 * np.array(dirn_w) x, self.w = [], [] for x0i in self.x0: xi = list( map( lambda dirn: self.geom.background_points( x0i, dirn, self.dynamic_dist2npts, 0 ), dirns, ) ) wi = list( map( lambda i: dirn_w[i] * np.linalg.norm(xi[i][1] - xi[i][0]) ** (-self.alpha) * self.get_weight(len(xi[i]) - 1), range(len(dirns)), ) ) # first order xi, wi = zip(*map(self.modify_first_order, xi, wi)) # second order # xi, wi = zip(*map(self.modify_second_order, xi, wi)) # third order # xi, wi = zip(*map(self.modify_third_order, xi, wi)) x.append(np.vstack(xi)) self.w.append(array_ops_compat.hstack(wi)) self.xindex_start = np.hstack(([0], np.cumsum(list(map(len, x))))) + len( self.x0 ) return np.vstack([self.x0] + x) def modify_first_order(self, x, w): x = np.vstack(([2 * x[0] - x[1]], x[:-1])) if not self.geom.inside(x[0:1])[0]: return x[1:], w[1:] return x, w def modify_second_order(self, x=None, w=None): w0 = np.hstack(([config.real(np)(0)], w)) w1 = np.hstack((w, [config.real(np)(0)])) beta = 1 - self.alpha / 2 w = beta * w0 + (1 - beta) * w1 if x is None: return w x = np.vstack(([2 * x[0] - x[1]], x)) if not self.geom.inside(x[0:1])[0]: return x[1:], w[1:] return x, w def modify_third_order(self, x=None, w=None): w0 = np.hstack(([config.real(np)(0)], w)) w1 = np.hstack((w, [config.real(np)(0)])) w2 = np.hstack(([config.real(np)(0)] * 2, w[:-1])) beta = 1 - self.alpha / 2 w = ( (-6 * beta ** 2 + 11 * beta + 1) / 6 * w0 + (11 - 6 * beta) * (1 - beta) / 12 * w1 + (6 * beta + 1) * (beta - 1) / 12 * w2 ) if x is None: return w x = np.vstack(([2 * x[0] - x[1]], x)) if not self.geom.inside(x[0:1])[0]: return x[1:], w[1:] return x, w def get_weight(self, n): return self._w_init[: n + 1] def get_matrix_static(self): if not bkd.is_tensor(self.alpha): int_mat = np.zeros( (self.disc.resolution[0], self.disc.resolution[0]), dtype=config.real(np), ) h = self.geom.diam / (self.disc.resolution[0] - 1) for i in range(1, self.disc.resolution[0] - 1): # first order int_mat[i, 1 : i + 2] = np.flipud(self.get_weight(i)) int_mat[i, i - 1 : -1] += self.get_weight( self.disc.resolution[0] - 1 - i ) # second order # int_mat[i, 0:i+2] = np.flipud(self.modify_second_order(w=self.get_weight(i))) # int_mat[i, i-1:] += self.modify_second_order(w=self.get_weight(self.disc.resolution[0]-1-i)) # third order # int_mat[i, 0:i+2] = np.flipud(self.modify_third_order(w=self.get_weight(i))) # int_mat[i, i-1:] += self.modify_third_order(w=self.get_weight(self.disc.resolution[0]-1-i)) return h ** (-self.alpha) * int_mat int_mat = bkd.zeros((1, self.disc.resolution[0]), dtype=config.real(bkd.lib)) for i in range(1, self.disc.resolution[0] - 1): if True: # shifted row = bkd.concat( [ bkd.zeros(1, dtype=config.real(bkd.lib)), bkd.reverse(self.get_weight(i), [0]), bkd.zeros( self.disc.resolution[0] - i - 2, dtype=config.real(bkd.lib) ), ], 0, ) row += bkd.concat( [ bkd.zeros(i - 1, dtype=config.real(bkd.lib)), self.get_weight(self.disc.resolution[0] - 1 - i), bkd.zeros(1, dtype=config.real(bkd.lib)), ], 0, ) else: # not shifted row = bkd.concat( [ bkd.reverse(self.get_weight(i), [0]), bkd.zeros(self.disc.resolution[0] - i - 1), ], 0, ) row += bkd.concat( [bkd.zeros(i), self.get_weight(self.disc.resolution[0] - 1 - i)], 0 ) row = bkd.expand_dims(row, 0) int_mat = bkd.concat([int_mat, row], 0) int_mat = bkd.concat( [int_mat, bkd.zeros([1, self.disc.resolution[0]], dtype=config.real(bkd.lib))], 0 ) h = self.geom.diam / (self.disc.resolution[0] - 1) return h ** (-self.alpha) * int_mat def get_matrix_dynamic(self, sparse): if self.x is None: raise AssertionError("No dynamic points") if sparse: print("Generating sparse fractional matrix...") dense_shape = (self.x0.shape[0], self.x.shape[0]) indices, values = [], [] beg = self.x0.shape[0] for i in range(self.x0.shape[0]): for _ in range(self.w[i].shape[0]): indices.append([i, beg]) beg += 1 values = array_ops_compat.hstack((values, self.w[i])) return indices, values, dense_shape print("Generating dense fractional matrix...") int_mat = np.zeros((self.x0.shape[0], self.x.shape[0]), dtype=config.real(np)) beg = self.x0.shape[0] for i in range(self.x0.shape[0]): int_mat[i, beg : beg + self.w[i].size] = self.w[i] beg += self.w[i].size return int_mat class FractionalTime: """Fractional derivative with time. Args: nt: If ``disc.meshtype = static``, then nt is the number of t points; if ``disc.meshtype = 'dynamic'``, then nt is None. x0: If ``disc.meshtype = static``, then x0 should be None; if ``disc.meshtype = 'dynamic'``, then x0 are non-boundary points. Attributes: nx: If ``disc.meshtype = static``, then nx is the number of x points; if ``disc.meshtype = dynamic``, then nx is the resolution lambda. """ def __init__(self, alpha, geom, tmin, tmax, disc, nt, x0): self.alpha = alpha self.geom, self.tmin, self.tmax = geom, tmin, tmax self.disc, self.nt, self.x0 = disc, nt, x0 self.x, self.fracx = None, None def get_x(self): self.x = ( self.get_x_static() if self.disc.meshtype == "static" else self.get_x_dynamic() ) return self.x def get_matrix(self, sparse=False): return ( self.get_matrix_static() if self.disc.meshtype == "static" else self.get_matrix_dynamic(sparse) ) def get_x_static(self): # Points are ordered as initial --> boundary --> inside x = self.geom.uniform_points(self.disc.resolution[0], True) x = np.roll(x, 1)[:, 0] dt = (self.tmax - self.tmin) / (self.nt - 1) d = np.empty((self.disc.resolution[0] * self.nt, self.geom.dim + 1), dtype=x.dtype) d[0 : self.disc.resolution[0], 0] = x d[0 : self.disc.resolution[0], 1] = self.tmin beg = self.disc.resolution[0] for i in range(1, self.nt): d[beg : beg + 2, 0] = x[:2] d[beg : beg + 2, 1] = self.tmin + i * dt beg += 2 for i in range(1, self.nt): d[beg : beg + self.disc.resolution[0] - 2, 0] = x[2:] d[beg : beg + self.disc.resolution[0] - 2, 1] = self.tmin + i * dt beg += self.disc.resolution[0] - 2 return d def get_x_dynamic(self): self.fracx = Fractional(self.alpha, self.geom, self.disc, self.x0[:, :-1]) xx = self.fracx.get_x() x = np.empty((len(xx), self.geom.dim + 1), dtype=xx.dtype) x[: len(self.x0)] = self.x0 beg = len(self.x0) for i in range(len(self.x0)): tmp = xx[self.fracx.xindex_start[i] : self.fracx.xindex_start[i + 1]] x[beg : beg + len(tmp), :1] = tmp x[beg : beg + len(tmp), -1] = self.x0[i, -1] beg += len(tmp) return x def get_matrix_static(self): # Only consider the inside points print("Warning: assume zero boundary condition.") n = (self.disc.resolution[0] - 2) * (self.nt - 1) int_mat = np.zeros((n, n), dtype=config.real(np)) self.fracx = Fractional(self.alpha, self.geom, self.disc, None) int_mat_one = self.fracx.get_matrix() beg = 0 for _ in range(self.nt - 1): int_mat[ beg : beg + self.disc.resolution[0] - 2, beg : beg + self.disc.resolution[0] - 2, ] = int_mat_one[1:-1, 1:-1] beg += self.disc.resolution[0] - 2 return int_mat def get_matrix_dynamic(self, sparse): return self.fracx.get_matrix(sparse)