Source code for deepxde.data.function_spaces

__all__ = [
    "FunctionSpace",
    "PowerSeries",
    "Chebyshev",
    "GRF",
    "GRF_KL",
    "GRF2D",
    "wasserstein2",
]

import abc

import numpy as np
from scipy import linalg, interpolate
from sklearn import gaussian_process as gp

from .. import config
from ..utils import isclose


[docs] class FunctionSpace(abc.ABC): """Function space base class. Example: .. code-block:: python space = dde.data.GRF() feats = space.random(10) xs = np.linspace(0, 1, num=100)[:, None] y = space.eval_batch(feats, xs) """
[docs] @abc.abstractmethod def random(self, size): """Generate feature vectors of random functions. Args: size (int): The number of random functions to generate. Returns: A NumPy array of shape (`size`, n_features). """
[docs] @abc.abstractmethod def eval_one(self, feature, x): """Evaluate the function at one point. Args: feature: The feature vector of the function to be evaluated. x: The point to be evaluated. Returns: float: The function value at `x`. """
[docs] @abc.abstractmethod def eval_batch(self, features, xs): """Evaluate a list of functions at a list of points. Args: features: A NumPy array of shape (n_functions, n_features). A list of the feature vectors of the functions to be evaluated. xs: A NumPy array of shape (n_points, dim). A list of points to be evaluated. Returns: A NumPy array of shape (n_functions, n_points). The values of different functions at different points. """
[docs] class PowerSeries(FunctionSpace): r"""Power series. p(x) = \sum_{i=0}^{N-1} a_i x^i Args: N (int) M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`]. """ def __init__(self, N=100, M=1): self.N = N self.M = M
[docs] def random(self, size): return 2 * self.M * np.random.rand(size, self.N).astype(config.real(np)) - self.M
[docs] def eval_one(self, feature, x): return np.dot(feature, x ** np.arange(self.N))
[docs] def eval_batch(self, features, xs): mat = np.ones((self.N, len(xs)), dtype=config.real(np)) for i in range(1, self.N): mat[i] = np.ravel(xs**i) return np.dot(features, mat)
[docs] class Chebyshev(FunctionSpace): r"""Chebyshev polynomial. p(x) = \sum_{i=0}^{N-1} a_i T_i(x), where T_i is Chebyshev polynomial of the first kind. Note: The domain of x is scaled from [-1, 1] to [0, 1]. Args: N (int) M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`]. """ def __init__(self, N=100, M=1): self.N = N self.M = M
[docs] def random(self, size): return 2 * self.M * np.random.rand(size, self.N) - self.M
[docs] def eval_one(self, feature, x): return np.polynomial.chebyshev.chebval(2 * x - 1, feature)
[docs] def eval_batch(self, features, xs): return np.polynomial.chebyshev.chebval(2 * np.ravel(xs) - 1, features.T).astype(config.real(np))
[docs] class GRF(FunctionSpace): """Gaussian random field (Gaussian process) in 1D. The random sampling algorithm is based on Cholesky decomposition of the covariance matrix. Args: T (float): `T` > 0. The domain is [0, `T`]. kernel (str): Name of the kernel function. "RBF" (radial-basis function kernel, squared-exponential kernel, Gaussian kernel), "AE" (absolute exponential kernel), or "ExpSineSquared" (Exp-Sine-Squared kernel, periodic kernel). length_scale (float): The length scale of the kernel. N (int): The size of the covariance matrix. interp (str): The interpolation to interpolate the random function. "linear", "quadratic", or "cubic". """ def __init__(self, T=1, kernel="RBF", length_scale=1, N=1000, interp="cubic"): self.N = N self.interp = interp self.x = np.linspace(0, T, num=N)[:, None] if kernel == "RBF": K = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": K = gp.kernels.Matern(length_scale=length_scale, nu=0.5) elif kernel == "ExpSineSquared": K = gp.kernels.ExpSineSquared(length_scale=length_scale, periodicity=T) self.K = K(self.x) self.L = np.linalg.cholesky(self.K + 1e-13 * np.eye(self.N))
[docs] def random(self, size): u = np.random.randn(self.N, size) return np.dot(self.L, u).T
[docs] def eval_one(self, feature, x): if self.interp == "linear": return np.interp(x, np.ravel(self.x), feature) f = interpolate.interp1d( np.ravel(self.x), feature, kind=self.interp, copy=False, assume_sorted=True ) return f(x)
[docs] def eval_batch(self, features, xs): if self.interp == "linear": return np.vstack([np.interp(xs, np.ravel(self.x), y).T for y in features]) res = map( lambda y: interpolate.interp1d( np.ravel(self.x), y, kind=self.interp, copy=False, assume_sorted=True )(xs).T, features, ) return np.vstack(list(res)).astype(config.real(np))
[docs] class GRF_KL(FunctionSpace): """Gaussian random field (Gaussian process) in 1D. The random sampling algorithm is based on truncated Karhunen-Loeve (KL) expansion. Args: T (float): `T` > 0. The domain is [0, `T`]. kernel (str): The kernel function. "RBF" (radial-basis function) or "AE" (absolute exponential). length_scale (float): The length scale of the kernel. num_eig (int): The number of eigenfunctions in KL expansion to be kept. N (int): Each eigenfunction is discretized at `N` points in [0, `T`]. interp (str): The interpolation to interpolate the random function. "linear", "quadratic", or "cubic". """ def __init__( self, T=1, kernel="RBF", length_scale=1, num_eig=10, N=100, interp="cubic" ): if not isclose(T, 1): raise ValueError("GRF_KL only supports T = 1.") self.num_eig = num_eig if kernel == "RBF": kernel = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": kernel = gp.kernels.Matern(length_scale=length_scale, nu=0.5) eigval, eigvec = eig(kernel, num_eig, N, eigenfunction=True) eigvec *= eigval**0.5 x = np.linspace(0, T, num=N) self.eigfun = [ interpolate.interp1d(x, y, kind=interp, copy=False, assume_sorted=True) for y in eigvec.T ]
[docs] def bases(self, sensors): """Evaluate the eigenfunctions at a list of points `sensors`.""" return np.array([np.ravel(f(sensors)) for f in self.eigfun])
[docs] def random(self, size): return np.random.randn(size, self.num_eig).astype(config.real(np))
[docs] def eval_one(self, feature, x): eigfun = [f(x) for f in self.eigfun] return np.sum(eigfun * feature)
[docs] def eval_batch(self, features, xs): eigfun = np.array([np.ravel(f(xs)) for f in self.eigfun], dtype=config.real(np)) return np.dot(features, eigfun)
[docs] class GRF2D(FunctionSpace): """Gaussian random field in [0, 1]x[0, 1]. The random sampling algorithm is based on Cholesky decomposition of the covariance matrix. Args: kernel (str): The kernel function. "RBF" (radial-basis function) or "AE" (absolute exponential). length_scale (float): The length scale of the kernel. N (int): The size of the covariance matrix. interp (str): The interpolation to interpolate the random function. "linear" or "splinef2d". Example: .. code-block:: python space = dde.data.GRF2D(length_scale=0.1) features = space.random(3) x = np.linspace(0, 1, num=500) y = np.linspace(0, 1, num=500) xv, yv = np.meshgrid(x, y) sensors = np.vstack((np.ravel(xv), np.ravel(yv))).T u = space.eval_batch(features, sensors) for ui in u: plt.figure() plt.imshow(np.reshape(ui, (len(y), len(x)))) plt.colorbar() plt.show() """ def __init__(self, kernel="RBF", length_scale=1, N=100, interp="splinef2d"): self.N = N self.interp = interp self.x = np.linspace(0, 1, num=N) self.y = np.linspace(0, 1, num=N) xv, yv = np.meshgrid(self.x, self.y) self.X = np.vstack((np.ravel(xv), np.ravel(yv))).T if kernel == "RBF": K = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": K = gp.kernels.Matern(length_scale=length_scale, nu=0.5) self.K = K(self.X) self.L = np.linalg.cholesky(self.K + 1e-12 * np.eye(self.N**2))
[docs] def random(self, size): u = np.random.randn(self.N**2, size) return np.dot(self.L, u).T
[docs] def eval_one(self, feature, x): y = np.reshape(feature, (self.N, self.N)) return interpolate.interpn((self.x, self.y), y, x, method=self.interp)[0]
[docs] def eval_batch(self, features, xs): points = (self.x, self.y) ys = np.reshape(features, (-1, self.N, self.N)) res = map(lambda y: interpolate.interpn(points, y, xs, method=self.interp), ys) return np.vstack(list(res)).astype(config.real(np))
[docs] def wasserstein2(space1, space2): """Compute 2-Wasserstein (W2) metric to measure the distance between two ``GRF``.""" return ( np.trace(space1.K + space2.K - 2 * linalg.sqrtm(space1.K @ space2.K)) ** 0.5 / space1.N**0.5 )
def eig(kernel, num, Nx, eigenfunction=True): """Compute the eigenvalues and eigenfunctions of a kernel function in [0, 1].""" h = 1 / (Nx - 1) c = kernel(np.linspace(0, 1, num=Nx)[:, None])[0] * h A = np.empty((Nx, Nx)) for i in range(Nx): A[i, i:] = c[: Nx - i] A[i, i::-1] = c[: i + 1] A[:, 0] *= 0.5 A[:, -1] *= 0.5 if not eigenfunction: return np.flipud(np.sort(np.real(np.linalg.eigvals(A))))[:num] eigval, eigvec = np.linalg.eig(A) eigval, eigvec = np.real(eigval), np.real(eigvec) idx = np.flipud(np.argsort(eigval))[:num] eigval, eigvec = eigval[idx], eigvec[:, idx] for i in range(num): eigvec[:, i] /= np.trapz(eigvec[:, i] ** 2, dx=h) ** 0.5 return eigval, eigvec