Source code for deepxde.data.pde

import numpy as np

from .data import Data
from .. import backend as bkd
from .. import config
from ..backend import backend_name
from ..utils import get_num_args, run_if_all_none, mpi_scatter_from_rank0


[docs] class PDE(Data): """ODE or time-independent PDE solver. Args: geometry: Instance of ``Geometry``. pde: A global PDE or a list of PDEs. ``None`` if no global PDE. bcs: A boundary condition or a list of boundary conditions. Use ``[]`` if no boundary condition. num_domain (int): The number of training points sampled inside the domain. num_boundary (int): The number of training points sampled on the boundary. train_distribution (string): The distribution to sample training points. One of the following: "uniform" (equispaced grid), "pseudo" (pseudorandom), "LHS" (Latin hypercube sampling), "Halton" (Halton sequence), "Hammersley" (Hammersley sequence), or "Sobol" (Sobol sequence). anchors: A Numpy array of training points, in addition to the `num_domain` and `num_boundary` sampled points. exclusions: A Numpy array of points to be excluded for training. solution: The reference solution. num_test: The number of points sampled inside the domain for testing PDE loss. The testing points for BCs/ICs are the same set of points used for training. If ``None``, then the training points will be used for testing. auxiliary_var_function: A function that inputs `train_x` or `test_x` and outputs auxiliary variables. Warning: The testing points include points inside the domain and points on the boundary, and they may not have the same density, and thus the entire testing points may not be uniformly distributed. As a result, if you have a reference solution (`solution`) and would like to compute a metric such as .. code-block:: python Model.compile(metrics=["l2 relative error"]) then the metric may not be very accurate. To better compute a metric, you can sample the points manually, and then use ``Model.predict()`` to predict the solution on thess points and compute the metric: .. code-block:: python x = geom.uniform_points(num, boundary=True) y_true = ... y_pred = model.predict(x) error= dde.metrics.l2_relative_error(y_true, y_pred) Attributes: train_x_all: A Numpy array of points for PDE training. `train_x_all` is unordered, and does not have duplication. If there is PDE, then `train_x_all` is used as the training points of PDE. train_x_bc: A Numpy array of the training points for BCs. `train_x_bc` is constructed from `train_x_all` at the first step of training, by default it won't be updated when `train_x_all` changes. To update `train_x_bc`, set it to `None` and call `bc_points`, and then update the loss function by ``model.compile()``. num_bcs (list): `num_bcs[i]` is the number of points for `bcs[i]`. train_x: A Numpy array of the points fed into the network for training. `train_x` is ordered from BC points (`train_x_bc`) to PDE points (`train_x_all`), and may have duplicate points. train_aux_vars: Auxiliary variables that associate with `train_x`. test_x: A Numpy array of the points fed into the network for testing, ordered from BCs to PDE. The BC points are exactly the same points in `train_x_bc`. test_aux_vars: Auxiliary variables that associate with `test_x`. """ def __init__( self, geometry, pde, bcs, num_domain=0, num_boundary=0, train_distribution="Hammersley", anchors=None, exclusions=None, solution=None, num_test=None, auxiliary_var_function=None, ): self.geom = geometry self.pde = pde self.bcs = bcs if isinstance(bcs, (list, tuple)) else [bcs] self.num_domain = num_domain self.num_boundary = num_boundary self.train_distribution = train_distribution if config.hvd is not None: if self.train_distribution != "pseudo": raise ValueError( "Parallel training via Horovod only supports pseudo train distribution." ) if config.parallel_scaling == "weak": print( "For weak scaling, num_domain and num_boundary are the numbers of points over each rank, not the total number of points." ) elif config.parallel_scaling == "strong": print( "For strong scaling, num_domain and num_boundary are the total number of points." ) self.anchors = None if anchors is None else anchors.astype(config.real(np)) self.exclusions = exclusions self.soln = solution self.num_test = num_test self.auxiliary_var_fn = auxiliary_var_function # TODO: train_x_all is used for PDE losses. It is better to add train_x_pde # explicitly. self.train_x_all = None self.train_x_bc = None self.num_bcs = None # these include both BC and PDE points self.train_x, self.train_y = None, None self.test_x, self.test_y = None, None self.train_aux_vars, self.test_aux_vars = None, None self.train_next_batch() self.test()
[docs] def losses(self, targets, outputs, loss_fn, inputs, model, aux=None): if backend_name in ["tensorflow.compat.v1", "paddle"]: outputs_pde = outputs elif backend_name in ["tensorflow", "pytorch"]: if config.autodiff == "reverse": outputs_pde = outputs elif config.autodiff == "forward": # forward-mode AD requires functions outputs_pde = (outputs, aux[0]) elif backend_name == "jax": # JAX requires pure functions outputs_pde = (outputs, aux[0]) f = [] if self.pde is not None: if get_num_args(self.pde) == 2: f = self.pde(inputs, outputs_pde) elif get_num_args(self.pde) == 3: if self.auxiliary_var_fn is None: if aux is None or len(aux) == 1: raise ValueError("Auxiliary variable function not defined.") f = self.pde(inputs, outputs_pde, unknowns=aux[1]) else: f = self.pde(inputs, outputs_pde, model.net.auxiliary_vars) if not isinstance(f, (list, tuple)): f = [f] if not isinstance(loss_fn, (list, tuple)): loss_fn = [loss_fn] * (len(f) + len(self.bcs)) elif len(loss_fn) != len(f) + len(self.bcs): raise ValueError( "There are {} errors, but only {} losses.".format( len(f) + len(self.bcs), len(loss_fn) ) ) bcs_start = np.cumsum([0] + self.num_bcs) bcs_start = list(map(int, bcs_start)) error_f = [fi[bcs_start[-1] :] for fi in f] losses = [ loss_fn[i](bkd.zeros_like(error), error) for i, error in enumerate(error_f) ] for i, bc in enumerate(self.bcs): beg, end = bcs_start[i], bcs_start[i + 1] # The same BC points are used for training and testing. error = bc.error(self.train_x, inputs, outputs, beg, end) losses.append(loss_fn[len(error_f) + i](bkd.zeros_like(error), error)) return losses
[docs] @run_if_all_none("train_x", "train_y", "train_aux_vars") def train_next_batch(self, batch_size=None): self.train_x_all = self.train_points() self.bc_points() # Generate self.num_bcs and self.train_x_bc if self.bcs and config.hvd is not None: num_bcs = np.array(self.num_bcs) config.comm.Bcast(num_bcs, root=0) self.num_bcs = list(num_bcs) x_bc_shape = np.array(self.train_x_bc.shape) config.comm.Bcast(x_bc_shape, root=0) if len(self.train_x_bc) != x_bc_shape[0]: self.train_x_bc = np.empty(x_bc_shape, dtype=self.train_x_bc.dtype) config.comm.Bcast(self.train_x_bc, root=0) self.train_x = self.train_x_bc if config.parallel_scaling == "strong": self.train_x_all = mpi_scatter_from_rank0(self.train_x_all) if self.pde is not None: self.train_x = np.vstack((self.train_x, self.train_x_all)) self.train_y = self.soln(self.train_x) if self.soln else None if self.auxiliary_var_fn is not None: self.train_aux_vars = self.auxiliary_var_fn(self.train_x).astype( config.real(np) ) return self.train_x, self.train_y, self.train_aux_vars
[docs] @run_if_all_none("test_x", "test_y", "test_aux_vars") def test(self): if self.num_test is None: self.test_x = self.train_x else: self.test_x = self.test_points() self.test_y = self.soln(self.test_x) if self.soln else None if self.auxiliary_var_fn is not None: self.test_aux_vars = self.auxiliary_var_fn(self.test_x).astype( config.real(np) ) return self.test_x, self.test_y, self.test_aux_vars
[docs] def resample_train_points(self, pde_points=True, bc_points=True): """Resample the training points for PDE and/or BC.""" if pde_points: self.train_x_all = None if bc_points: self.train_x_bc = None self.train_x, self.train_y, self.train_aux_vars = None, None, None self.train_next_batch()
[docs] def add_anchors(self, anchors): """Add new points for training PDE losses. The BC points will not be updated.""" anchors = anchors.astype(config.real(np)) if self.anchors is None: self.anchors = anchors else: self.anchors = np.vstack((anchors, self.anchors)) self.train_x_all = np.vstack((anchors, self.train_x_all)) self.train_x = self.bc_points() if self.pde is not None: self.train_x = np.vstack((self.train_x, self.train_x_all)) self.train_y = self.soln(self.train_x) if self.soln else None if self.auxiliary_var_fn is not None: self.train_aux_vars = self.auxiliary_var_fn(self.train_x).astype( config.real(np) )
[docs] def replace_with_anchors(self, anchors): """Replace the current PDE training points with anchors. The BC points will not be changed.""" self.anchors = anchors.astype(config.real(np)) self.train_x_all = self.anchors self.train_x = self.bc_points() if self.pde is not None: self.train_x = np.vstack((self.train_x, self.train_x_all)) self.train_y = self.soln(self.train_x) if self.soln else None if self.auxiliary_var_fn is not None: self.train_aux_vars = self.auxiliary_var_fn(self.train_x).astype( config.real(np) )
[docs] @run_if_all_none("train_x_all") def train_points(self): X = np.empty((0, self.geom.dim), dtype=config.real(np)) if self.num_domain > 0: if self.train_distribution == "uniform": X = self.geom.uniform_points(self.num_domain, boundary=False) else: X = self.geom.random_points( self.num_domain, random=self.train_distribution ) if self.num_boundary > 0: if self.train_distribution == "uniform": tmp = self.geom.uniform_boundary_points(self.num_boundary) else: tmp = self.geom.random_boundary_points( self.num_boundary, random=self.train_distribution ) X = np.vstack((tmp, X)) if self.anchors is not None: X = np.vstack((self.anchors, X)) if self.exclusions is not None: def is_not_excluded(x): return not np.any([np.allclose(x, y) for y in self.exclusions]) X = np.array(list(filter(is_not_excluded, X))) self.train_x_all = X return X
[docs] @run_if_all_none("train_x_bc") def bc_points(self): x_bcs = [bc.collocation_points(self.train_x_all) for bc in self.bcs] self.num_bcs = list(map(len, x_bcs)) self.train_x_bc = ( np.vstack(x_bcs) if x_bcs else np.empty([0, self.train_x_all.shape[-1]], dtype=config.real(np)) ) return self.train_x_bc
[docs] def test_points(self): # TODO: Use different BC points from self.train_x_bc x = self.geom.uniform_points(self.num_test, boundary=False) x = np.vstack((self.train_x_bc, x)) return x
[docs] class TimePDE(PDE): """Time-dependent PDE solver. Args: num_initial (int): The number of training points sampled on the initial location. """ def __init__( self, geometryxtime, pde, ic_bcs, num_domain=0, num_boundary=0, num_initial=0, train_distribution="Hammersley", anchors=None, exclusions=None, solution=None, num_test=None, auxiliary_var_function=None, ): self.num_initial = num_initial super().__init__( geometryxtime, pde, ic_bcs, num_domain, num_boundary, train_distribution=train_distribution, anchors=anchors, exclusions=exclusions, solution=solution, num_test=num_test, auxiliary_var_function=auxiliary_var_function, )
[docs] @run_if_all_none("train_x_all") def train_points(self): X = super().train_points() if self.num_initial > 0: if self.train_distribution == "uniform": tmp = self.geom.uniform_initial_points(self.num_initial) else: tmp = self.geom.random_initial_points( self.num_initial, random=self.train_distribution ) if self.exclusions is not None: def is_not_excluded(x): return not np.any([np.allclose(x, y) for y in self.exclusions]) tmp = np.array(list(filter(is_not_excluded, tmp))) X = np.vstack((tmp, X)) self.train_x_all = X return X