Source code for deepxde.nn.tensorflow_compat_v1.deeponet

__all__ = ["DeepONet", "DeepONetCartesianProd"]

from abc import ABC, abstractmethod

import numpy as np

from .nn import NN
from .. import activations
from .. import initializers
from .. import regularizers
from ... import config
from ...backend import tf
from ...utils import timing


class DeepONetStrategy(ABC):
    """DeepONet building strategy.

    See the section 3.1.6. in
    L. Lu, X. Meng, S. Cai, Z. Mao, S. Goswami, Z. Zhang, & G. Karniadakis.
    A comprehensive and fair comparison of two neural operators
    (with practical extensions) based on FAIR data.
    Computer Methods in Applied Mechanics and Engineering, 393, 114778, 2022.
    """

    def __init__(self, net):
        self.net = net

    def _build_branch_and_trunk(self):
        # Branch net to encode the input function
        branch = self.net.build_branch_net()
        # Trunk net to encode the domain of the output function
        trunk = self.net.build_trunk_net()
        return branch, trunk

    @abstractmethod
    def build(self):
        """Return the output tensor."""


class SingleOutputStrategy(DeepONetStrategy):
    """Single output build strategy is the standard build method."""

    def build(self):
        branch, trunk = self._build_branch_and_trunk()
        if branch.shape[-1] != trunk.shape[-1]:
            raise AssertionError(
                "Output sizes of branch net and trunk net do not match."
            )
        y = self.net.merge_branch_trunk(branch, trunk)
        return y


class IndependentStrategy(DeepONetStrategy):
    """Directly use n independent DeepONets,
    and each DeepONet outputs only one function.
    """

    def build(self):
        single_output_strategy = SingleOutputStrategy(self.net)
        ys = [single_output_strategy.build() for _ in range(self.net.num_outputs)]
        return self.net.concatenate_outputs(ys)


class SplitBothStrategy(DeepONetStrategy):
    """Split the outputs of both the branch net and the trunk net into n groups,
    and then the kth group outputs the kth solution.

    For example, if n = 2 and both the branch and trunk nets have 100 output neurons,
    then the dot product between the first 50 neurons of
    the branch and trunk nets generates the first function,
    and the remaining 50 neurons generate the second function.
    """

    def build(self):
        branch, trunk = self._build_branch_and_trunk()
        if branch.shape[-1] != trunk.shape[-1]:
            raise AssertionError(
                "Output sizes of branch net and trunk net do not match."
            )
        if branch.shape[-1] % self.net.num_outputs != 0:
            raise AssertionError(
                f"Output size of the branch net is not evenly divisible by {self.net.num_outputs}."
            )
        branch_groups = tf.split(
            branch, num_or_size_splits=self.net.num_outputs, axis=1
        )
        trunk_groups = tf.split(trunk, num_or_size_splits=self.net.num_outputs, axis=1)
        ys = []
        for i in range(self.net.num_outputs):
            y = self.net.merge_branch_trunk(branch_groups[i], trunk_groups[i])
            ys.append(y)
        return self.net.concatenate_outputs(ys)


class SplitBranchStrategy(DeepONetStrategy):
    """Split the branch net and share the trunk net."""

    def build(self):
        branch, trunk = self._build_branch_and_trunk()
        if branch.shape[-1] % self.net.num_outputs != 0:
            raise AssertionError(
                f"Output size of the branch net is not evenly divisible by {self.net.num_outputs}."
            )
        if branch.shape[-1] / self.net.num_outputs != trunk.shape[-1]:
            raise AssertionError(
                f"Output size of the trunk net does not equal to {branch.shape[-1] // self.net.num_outputs}."
            )
        branch_groups = tf.split(
            branch, num_or_size_splits=self.net.num_outputs, axis=1
        )
        ys = []
        for i in range(self.net.num_outputs):
            y = self.net.merge_branch_trunk(branch_groups[i], trunk)
            ys.append(y)
        return self.net.concatenate_outputs(ys)


class SplitTrunkStrategy(DeepONetStrategy):
    """Split the trunk net and share the branch net."""

    def build(self):
        branch, trunk = self._build_branch_and_trunk()
        if trunk.shape[-1] % self.net.num_outputs != 0:
            raise AssertionError(
                f"Output size of the trunk net is not evenly divisible by {self.net.num_outputs}."
            )
        if trunk.shape[-1] / self.net.num_outputs != branch.shape[-1]:
            raise AssertionError(
                f"Output size of the branch net does not equal to {trunk.shape[-1] // self.net.num_outputs}."
            )
        trunk_groups = tf.split(trunk, num_or_size_splits=self.net.num_outputs, axis=1)
        ys = []
        for i in range(self.net.num_outputs):
            y = self.net.merge_branch_trunk(branch, trunk_groups[i])
            ys.append(y)
        return self.net.concatenate_outputs(ys)


[docs] class DeepONet(NN): """Deep operator network. `Lu et al. Learning nonlinear operators via DeepONet based on the universal approximation theorem of operators. Nat Mach Intell, 2021. <https://doi.org/10.1038/s42256-021-00302-5>`_ Args: layer_sizes_branch: A list of integers as the width of a fully connected network, or `(dim, f)` where `dim` is the input dimension and `f` is a network function. The width of the last layer in the branch and trunk net should be the same for all strategies except "split_branch" and "split_trunk". layer_sizes_trunk (list): A list of integers as the width of a fully connected network. activation: If `activation` is a ``string``, then the same activation is used in both trunk and branch nets. If `activation` is a ``dict``, then the trunk net uses the activation `activation["trunk"]`, and the branch net uses `activation["branch"]`. dropout_rate: If `dropout_rate` is a ``float`` between 0 and 1, then the same rate is used in both trunk and branch nets. If `dropout_rate` is a ``dict``, then the trunk net uses the rate `dropout_rate["trunk"]`, and the branch net uses `dropout_rate["branch"]`. Both `dropout_rate["trunk"]` and `dropout_rate["branch"]` should be ``float`` or lists of ``float``. The list length should match the length of `layer_sizes_trunk` - 1 for the trunk net and `layer_sizes_branch` - 2 for the branch net. trainable_branch: Boolean. trainable_trunk: Boolean or a list of booleans. num_outputs (integer): Number of outputs. In case of multiple outputs, i.e., `num_outputs` > 1, `multi_output_strategy` below should be set. multi_output_strategy (str or None): ``None``, "independent", "split_both", "split_branch" or "split_trunk". It makes sense to set in case of multiple outputs. - None Classical implementation of DeepONet with a single output. Cannot be used with `num_outputs` > 1. - independent Use `num_outputs` independent DeepONets, and each DeepONet outputs only one function. - split_both Split the outputs of both the branch net and the trunk net into `num_outputs` groups, and then the kth group outputs the kth solution. - split_branch Split the branch net and share the trunk net. The width of the last layer in the branch net should be equal to the one in the trunk net multiplied by the number of outputs. - split_trunk Split the trunk net and share the branch net. The width of the last layer in the trunk net should be equal to the one in the branch net multiplied by the number of outputs. """ def __init__( self, layer_sizes_branch, layer_sizes_trunk, activation, kernel_initializer, regularization=None, dropout_rate=0, use_bias=True, stacked=False, trainable_branch=True, trainable_trunk=True, num_outputs=1, multi_output_strategy=None, ): super().__init__() if isinstance(trainable_trunk, (list, tuple)): if len(trainable_trunk) != len(layer_sizes_trunk) - 1: raise ValueError("trainable_trunk does not match layer_sizes_trunk.") self.layer_size_func = layer_sizes_branch self.layer_size_loc = layer_sizes_trunk if isinstance(activation, dict): self.activation_branch = activations.get(activation["branch"]) self.activation_trunk = activations.get(activation["trunk"]) else: self.activation_branch = self.activation_trunk = activations.get(activation) self.kernel_initializer = initializers.get(kernel_initializer) if stacked: self.kernel_initializer_stacked = initializers.get( "stacked " + kernel_initializer ) self.regularizer = regularizers.get(regularization) if isinstance(dropout_rate, dict): self.dropout_rate_branch = dropout_rate["branch"] self.dropout_rate_trunk = dropout_rate["trunk"] else: self.dropout_rate_branch = self.dropout_rate_trunk = dropout_rate if isinstance(self.dropout_rate_branch, list): if not (len(layer_sizes_branch) - 2) == len(self.dropout_rate_branch): raise ValueError( "Number of dropout rates of branch net must be " f"equal to {len(layer_sizes_branch) - 2}" ) else: self.dropout_rate_branch = [self.dropout_rate_branch] * ( len(layer_sizes_branch) - 2 ) if isinstance(self.dropout_rate_trunk, list): if not (len(layer_sizes_trunk) - 1) == len(self.dropout_rate_trunk): raise ValueError( "Number of dropout rates of trunk net must be " f"equal to {len(layer_sizes_trunk) - 1}" ) else: self.dropout_rate_trunk = [self.dropout_rate_trunk] * ( len(layer_sizes_trunk) - 1 ) self.use_bias = use_bias self.stacked = stacked self.trainable_branch = trainable_branch self.trainable_trunk = trainable_trunk self._inputs = None self._X_func_default = None self.num_outputs = num_outputs if self.num_outputs == 1: if multi_output_strategy is not None: raise ValueError( "num_outputs is set to 1, but multi_output_strategy is not None." ) elif multi_output_strategy is None: multi_output_strategy = "independent" print( f"Warning: There are {num_outputs} outputs, but no multi_output_strategy selected. " 'Use "independent" as the multi_output_strategy.' ) self.multi_output_strategy = { None: SingleOutputStrategy, "independent": IndependentStrategy, "split_both": SplitBothStrategy, "split_branch": SplitBranchStrategy, "split_trunk": SplitTrunkStrategy, }[multi_output_strategy](self) @property def inputs(self): return self._inputs @inputs.setter def inputs(self, value): if value[1] is not None: raise ValueError("DeepONet does not support setting trunk net input.") self._X_func_default = value[0] self._inputs = self.X_loc @property def outputs(self): return self.y @property def targets(self): return self.target def _feed_dict_inputs(self, inputs): if not isinstance(inputs, (list, tuple)): n = len(inputs) inputs = [np.tile(self._X_func_default, (n, 1)), inputs] return dict(zip([self.X_func, self.X_loc], inputs))
[docs] @timing def build(self): print("Building DeepONet...") self.X_func = tf.placeholder(config.real(tf), [None, self.layer_size_func[0]]) self.X_loc = tf.placeholder(config.real(tf), [None, self.layer_size_loc[0]]) self._inputs = [self.X_func, self.X_loc] self.y = self.multi_output_strategy.build() if self._output_transform is not None: self.y = self._output_transform(self._inputs, self.y) self.target = tf.placeholder(config.real(tf), [None, self.num_outputs]) self.built = True
[docs] def build_branch_net(self): if callable(self.layer_size_func[1]): # User-defined network return self.layer_size_func[1](self.X_func) if self.stacked: # Stacked fully connected network return self._build_stacked_branch_net() # Unstacked fully connected network return self._build_unstacked_branch_net()
def _build_stacked_branch_net(self): y_func = self.X_func stack_size = self.layer_size_func[-1] for i in range(1, len(self.layer_size_func) - 1): y_func = self._stacked_dense( y_func, self.layer_size_func[i], stack_size=stack_size, activation=self.activation_branch, trainable=self.trainable_branch, ) if self.dropout_rate_branch[i - 1] > 0: y_func = tf.layers.dropout( y_func, rate=self.dropout_rate_branch[i - 1], training=self.training, ) return self._stacked_dense( y_func, 1, stack_size=stack_size, use_bias=self.use_bias, trainable=self.trainable_branch, ) def _build_unstacked_branch_net(self): y_func = self.X_func for i in range(1, len(self.layer_size_func) - 1): y_func = self._dense( y_func, self.layer_size_func[i], activation=self.activation_branch, regularizer=self.regularizer, trainable=self.trainable_branch, ) if self.dropout_rate_branch[i - 1] > 0: y_func = tf.layers.dropout( y_func, rate=self.dropout_rate_branch[i - 1], training=self.training, ) return self._dense( y_func, self.layer_size_func[-1], use_bias=self.use_bias, regularizer=self.regularizer, trainable=self.trainable_branch, )
[docs] def build_trunk_net(self): y_loc = self.X_loc if self._input_transform is not None: y_loc = self._input_transform(y_loc) for i in range(1, len(self.layer_size_loc)): y_loc = self._dense( y_loc, self.layer_size_loc[i], activation=self.activation_trunk, regularizer=self.regularizer, trainable=self.trainable_trunk[i - 1] if isinstance(self.trainable_trunk, (list, tuple)) else self.trainable_trunk, ) if self.dropout_rate_trunk[i - 1] > 0: y_loc = tf.layers.dropout( y_loc, rate=self.dropout_rate_trunk[i - 1], training=self.training ) return y_loc
[docs] def merge_branch_trunk(self, branch, trunk): # Dot product y = tf.einsum("bi,bi->b", branch, trunk) y = tf.expand_dims(y, axis=1) if self.use_bias: b = tf.Variable(tf.zeros(1, dtype=config.real(tf))) y += b return y
[docs] @staticmethod def concatenate_outputs(ys): return tf.concat(ys, axis=1)
def _dense( self, inputs, units, activation=None, use_bias=True, regularizer=None, trainable=True, ): dense = tf.keras.layers.Dense( units, activation=activation, use_bias=use_bias, kernel_initializer=self.kernel_initializer, kernel_regularizer=regularizer, trainable=trainable, ) out = dense(inputs) if regularizer: self.regularization_loss += tf.math.add_n(dense.losses) return out def _stacked_dense( self, inputs, units, stack_size, activation=None, use_bias=True, trainable=True ): """Stacked densely-connected NN layer. Args: inputs: If inputs is the NN input, then it is a 2D tensor with shape: `(batch_size, input_dim)`; otherwise, it is 3D tensor with shape: `(batch_size, stack_size, input_dim)`. Returns: tensor: outputs. If outputs is the NN output, i.e., units = 1, 2D tensor with shape: `(batch_size, stack_size)`; otherwise, 3D tensor with shape: `(batch_size, stack_size, units)`. """ shape = inputs.shape input_dim = shape[-1] if len(shape) == 2: # NN input layer W = tf.Variable( self.kernel_initializer_stacked([stack_size, input_dim, units]), trainable=trainable, ) outputs = tf.einsum("bi,nij->bnj", inputs, W) elif units == 1: # NN output layer W = tf.Variable( self.kernel_initializer_stacked([stack_size, input_dim]), trainable=trainable, ) outputs = tf.einsum("bni,ni->bn", inputs, W) else: W = tf.Variable( self.kernel_initializer_stacked([stack_size, input_dim, units]), trainable=trainable, ) outputs = tf.einsum("bni,nij->bnj", inputs, W) if use_bias: if units == 1: # NN output layer b = tf.Variable(tf.zeros(stack_size), trainable=trainable) else: b = tf.Variable(tf.zeros([stack_size, units]), trainable=trainable) outputs += b if activation is not None: return activation(outputs) return outputs
[docs] class DeepONetCartesianProd(NN): """Deep operator network for dataset in the format of Cartesian product. Args: layer_sizes_branch: A list of integers as the width of a fully connected network, or `(dim, f)` where `dim` is the input dimension and `f` is a network function. The width of the last layer in the branch and trunk net should be the same for all strategies except "split_branch" and "split_trunk". layer_sizes_trunk (list): A list of integers as the width of a fully connected network. activation: If `activation` is a ``string``, then the same activation is used in both trunk and branch nets. If `activation` is a ``dict``, then the trunk net uses the activation `activation["trunk"]`, and the branch net uses `activation["branch"]`. dropout_rate: If `dropout_rate` is a ``float`` between 0 and 1, then the same rate is used in both trunk and branch nets. If `dropout_rate` is a ``dict``, then the trunk net uses the rate `dropout_rate["trunk"]`, and the branch net uses `dropout_rate["branch"]`. Both `dropout_rate["trunk"]` and `dropout_rate["branch"]` should be ``float`` or lists of ``float``. The list length should match the length of `layer_sizes_trunk` - 1 for the trunk net and `layer_sizes_branch` - 2 for the branch net. num_outputs (integer): Number of outputs. In case of multiple outputs, i.e., `num_outputs` > 1, `multi_output_strategy` below should be set. multi_output_strategy (str or None): ``None``, "independent", "split_both", "split_branch" or "split_trunk". It makes sense to set in case of multiple outputs. - None Classical implementation of DeepONet with a single output. Cannot be used with `num_outputs` > 1. - independent Use `num_outputs` independent DeepONets, and each DeepONet outputs only one function. - split_both Split the outputs of both the branch net and the trunk net into `num_outputs` groups, and then the kth group outputs the kth solution. - split_branch Split the branch net and share the trunk net. The width of the last layer in the branch net should be equal to the one in the trunk net multiplied by the number of outputs. - split_trunk Split the trunk net and share the branch net. The width of the last layer in the trunk net should be equal to the one in the branch net multiplied by the number of outputs. """ def __init__( self, layer_sizes_branch, layer_sizes_trunk, activation, kernel_initializer, regularization=None, dropout_rate=0, num_outputs=1, multi_output_strategy=None, ): super().__init__() self.layer_size_func = layer_sizes_branch self.layer_size_loc = layer_sizes_trunk if isinstance(activation, dict): self.activation_branch = activations.get(activation["branch"]) self.activation_trunk = activations.get(activation["trunk"]) else: self.activation_branch = self.activation_trunk = activations.get(activation) self.kernel_initializer = initializers.get(kernel_initializer) self.regularizer = regularizers.get(regularization) if isinstance(dropout_rate, dict): self.dropout_rate_branch = dropout_rate["branch"] self.dropout_rate_trunk = dropout_rate["trunk"] else: self.dropout_rate_branch = self.dropout_rate_trunk = dropout_rate if isinstance(self.dropout_rate_branch, list): if not (len(layer_sizes_branch) - 2) == len(self.dropout_rate_branch): raise ValueError( "Number of dropout rates of branch net must be " f"equal to {len(layer_sizes_branch) - 2}" ) else: self.dropout_rate_branch = [self.dropout_rate_branch] * ( len(layer_sizes_branch) - 2 ) if isinstance(self.dropout_rate_trunk, list): if not (len(layer_sizes_trunk) - 1) == len(self.dropout_rate_trunk): raise ValueError( "Number of dropout rates of trunk net must be " f"equal to {len(layer_sizes_trunk) - 1}" ) else: self.dropout_rate_trunk = [self.dropout_rate_trunk] * ( len(layer_sizes_trunk) - 1 ) self._inputs = None self.num_outputs = num_outputs if self.num_outputs == 1: if multi_output_strategy is not None: raise ValueError( "num_outputs is set to 1, but multi_output_strategy is not None." ) elif multi_output_strategy is None: multi_output_strategy = "independent" print( f"Warning: There are {num_outputs} outputs, but no multi_output_strategy selected. " 'Use "independent" as the multi_output_strategy.' ) self.multi_output_strategy = { None: SingleOutputStrategy, "independent": IndependentStrategy, "split_both": SplitBothStrategy, "split_branch": SplitBranchStrategy, "split_trunk": SplitTrunkStrategy, }[multi_output_strategy](self) @property def inputs(self): return self._inputs @property def outputs(self): return self.y @property def targets(self): return self.target
[docs] @timing def build(self): print("Building DeepONetCartesianProd...") self.X_func = tf.placeholder(config.real(tf), [None, self.layer_size_func[0]]) self.X_loc = tf.placeholder(config.real(tf), [None, self.layer_size_loc[0]]) self._inputs = [self.X_func, self.X_loc] self.y = self.multi_output_strategy.build() if self._output_transform is not None: self.y = self._output_transform(self._inputs, self.y) if self.num_outputs > 1: self.target = tf.placeholder(config.real(tf), [None, None, None]) else: self.target = tf.placeholder(config.real(tf), [None, None]) self.built = True
[docs] def build_branch_net(self): y_func = self.X_func if callable(self.layer_size_func[1]): # User-defined network y_func = self.layer_size_func[1](y_func) else: # Fully connected network for i in range(1, len(self.layer_size_func) - 1): y_func = self._dense( y_func, self.layer_size_func[i], activation=self.activation_branch, regularizer=self.regularizer, ) if self.dropout_rate_branch[i - 1] > 0: y_func = tf.layers.dropout( y_func, rate=self.dropout_rate_branch[i - 1], training=self.training, ) y_func = self._dense( y_func, self.layer_size_func[-1], regularizer=self.regularizer, ) return y_func
[docs] def build_trunk_net(self): # Trunk net to encode the domain of the output function y_loc = self.X_loc if self._input_transform is not None: y_loc = self._input_transform(y_loc) for i in range(1, len(self.layer_size_loc)): y_loc = self._dense( y_loc, self.layer_size_loc[i], activation=self.activation_trunk, regularizer=self.regularizer, ) if self.dropout_rate_trunk[i - 1] > 0: y_loc = tf.layers.dropout( y_loc, rate=self.dropout_rate_trunk[i - 1], training=self.training ) return y_loc
[docs] def merge_branch_trunk(self, branch, trunk): y = tf.einsum("bi,ni->bn", branch, trunk) # Add bias b = tf.Variable(tf.zeros(1, dtype=config.real(tf))) y += b return y
[docs] @staticmethod def concatenate_outputs(ys): return tf.stack(ys, axis=2)
def _dense( self, inputs, units, activation=None, use_bias=True, regularizer=None, trainable=True, ): dense = tf.keras.layers.Dense( units, activation=activation, use_bias=use_bias, kernel_initializer=self.kernel_initializer, kernel_regularizer=regularizer, trainable=trainable, ) out = dense(inputs) if regularizer: self.regularization_loss += tf.math.add_n(dense.losses) return out