Skip to content

cosmopower_PCAplusNN module

cosmopower_PCAplusNN (Model)

Mapping between cosmological parameters and PCA coefficients of (log)-power spectra. Requires PCA compression to have been previously computed (cosmopower_PCA).

Attributes:

Name Type Description
cp_pca cosmopower_PCA

cosmopower_PCA instance

n_hidden list [int]

list with number of nodes for each hidden layer

restore bool

whether to restore a previously trained model or not

restore_filename str

filename tag (without suffix) for restoring trained model from file (this will be a pickle file with all of the model attributes and weights)

trainable bool

training layers

optimizer tf.keras.optimizer

optimizer for training

verbose bool

whether to print messages at intermediate steps or not

Source code in cosmopower/cosmopower_PCAplusNN.py
class cosmopower_PCAplusNN(tf.keras.Model):
    r"""
    Mapping between cosmological parameters and PCA coefficients of (log)-power spectra.
    Requires PCA compression to have been previously computed (`cosmopower_PCA`).

    Attributes:
        cp_pca (cosmopower_PCA):
            `cosmopower_PCA` instance
        n_hidden (list [int]):
            list with number of nodes for each hidden layer
        restore (bool):
            whether to restore a previously trained model or not
        restore_filename (str):
            filename tag (without suffix) for restoring trained model from file 
            (this will be a pickle file with all of the model attributes and weights)
        trainable (bool):
            training layers
        optimizer (tf.keras.optimizer):
            optimizer for training
        verbose (bool):
            whether to print messages at intermediate steps or not
    """

    def __init__(self, 
                 cp_pca=None,
                 n_hidden=[512,512,512], 
                 restore=False, 
                 restore_filename=None, 
                 trainable=True, 
                 optimizer=tf.keras.optimizers.Adam(),
                 verbose=False,
                 ):
        r"""
        Constructor.
        """
        # super
        super(cosmopower_PCAplusNN, self).__init__()

        # restore
        if restore is True:
            self.restore(restore_filename)

        # else set variables from input arguments
        else:
            # PCA compression, previously computed
            self.cp_pca = cp_pca

            # parameters
            self.parameters = self.cp_pca.parameters
            self.n_parameters = len(self.parameters)
            self.pca_transform_matrix_ = self.cp_pca.pca_transform_matrix
            self.modes = self.cp_pca.modes
            self.n_modes = self.cp_pca.n_modes
            self.n_pcas = self.pca_transform_matrix_.shape[0]
            self.n_hidden = n_hidden

            # architecture
            self.architecture = [self.n_parameters] + self.n_hidden + [self.n_pcas]
            self.n_layers = len(self.architecture) - 1

            # standardisation

            # input parameters mean and std
            self.parameters_mean_ = self.cp_pca.parameters_mean
            self.parameters_std_ = self.cp_pca.parameters_std

            # PCA mean and std
            self.pca_mean_ = self.cp_pca.pca_mean
            self.pca_std_ = self.cp_pca.pca_std

            # spectra mean and std
            self.features_mean_ = self.cp_pca.features_mean
            self.features_std_ = self.cp_pca.features_std

        # input parameters mean and std
        self.parameters_mean = tf.constant(self.parameters_mean_, dtype=dtype, name='parameters_mean')
        self.parameters_std = tf.constant(self.parameters_std_, dtype=dtype, name='parameters_std')

        # PCA mean and std
        self.pca_mean = tf.constant(self.pca_mean_, dtype=dtype, name='pca_mean')
        self.pca_std = tf.constant(self.pca_std_, dtype=dtype, name='pca_std')

        # (log)-spectra mean and std
        self.features_mean = tf.constant(self.features_mean_, dtype=dtype, name='features_mean')
        self.features_std = tf.constant(self.features_std_, dtype=dtype, name='features_std')

        # pca transform matrix
        self.pca_transform_matrix = tf.constant(self.pca_transform_matrix_, dtype=dtype, name='pca_transform_matrix')

        # weights, biases and activation function parameters for each layer of the network
        self.W = []
        self.b = []
        self.alphas = []
        self.betas = [] 
        for i in range(self.n_layers):
            self.W.append(tf.Variable(tf.random.normal([self.architecture[i], self.architecture[i+1]], 0., np.sqrt(2./self.n_parameters)), name="W_" + str(i), trainable=trainable))
            self.b.append(tf.Variable(tf.zeros([self.architecture[i+1]]), name = "b_" + str(i), trainable=trainable))
        for i in range(self.n_layers-1):
            self.alphas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "alphas_" + str(i), trainable=trainable))
            self.betas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "betas_" + str(i), trainable=trainable))

        # restore weights if restore = True
        if restore is True:
            for i in range(self.n_layers):
                self.W[i].assign(self.W_[i])
                self.b[i].assign(self.b_[i])
            for i in range(self.n_layers-1):
                self.alphas[i].assign(self.alphas_[i])
                self.betas[i].assign(self.betas_[i])

        self.optimizer = optimizer
        self.verbose= verbose

        # print initialization info, if verbose
        if self.verbose:
            multiline_str = "\nInitialized cosmopower_PCAplusNN model, \n" \
                            f"mapping {self.n_parameters} input parameters to {self.n_pcas} PCA components \n" \
                            f"and then inverting the PCA compression to obtain {self.n_modes} modes \n" \
                            f"The model uses {len(self.n_hidden)} hidden layers, \n" \
                            f"with {list(self.n_hidden)} nodes, respectively. \n"
            print(multiline_str)


# ========== TENSORFLOW implementation ===============

    # non-linear activation function
    def activation(self,
                   x,
                   alpha,
                   beta,
                   ):
        r"""
        Non-linear activation function

        Parameters:
            x (Tensor):
                linear output from previous layer
            alpha (Tensor):
                trainable parameter
            beta (Tensor):
                trainable parameter

        Returns:
            Tensor:
                the result of applying the non-linear activation function to the linear output of the layer
        """
        return tf.multiply(tf.add(beta, tf.multiply(tf.sigmoid(tf.multiply(alpha, x)), tf.subtract(1.0, beta)) ), x)


    # forward pass through the network to predict PCA coefficients
    def forward_pass_tf(self, 
                        parameters_tensor,
                        ):
        r"""
        Forward pass through the network to predict the PCA coefficients,
        fully implemented in TensorFlow

        Parameters:
            parameters_tensor (Tensor):
                tensor of shape (number_of_cosmologies, number_of_cosmological_parameters)

        Returns:
            Tensor:
                PCA predictions
        """
        outputs = []
        layers = [tf.divide(tf.subtract(parameters_tensor, self.parameters_mean), self.parameters_std)]
        for i in range(self.n_layers - 1):

            # linear network operation
            outputs.append(tf.add(tf.matmul(layers[-1], self.W[i]), self.b[i]))

            # non-linear activation function
            layers.append(self.activation(outputs[-1], self.alphas[i], self.betas[i]))

        # linear output layer
        layers.append(tf.add(tf.matmul(layers[-1], self.W[-1]), self.b[-1]))

        # rescale the output (predicted PCA coefficients) and return
        return tf.add(tf.multiply(layers[-1], self.pca_std), self.pca_mean)


    # pass inputs through the network to predict (log)-spectrum
    @tf.function
    def predictions_tf(self, 
                       parameters_tensor,
                       ):
        r"""
        Predictions given tensor of input parameters,
        fully implemented in TensorFlow. Calls ``forward_pass_tf``
        and inverts PCA

        Parameters:
            parameters_tensor (Tensor):
                input parameters

        Returns:
            Tensor:
                output predictions
        """
        # pass through network to compute PCA coefficients
        pca_coefficients = self.forward_pass_tf(parameters_tensor)

        # transform from PCA to normalized spectrum basis; shift and re-scale normalised (log)-spectrum -> (log)-spectrum
        return tf.add(tf.multiply(tf.matmul(pca_coefficients, self.pca_transform_matrix), self.features_std), self.features_mean)


    # tensor 10.**predictions
    @tf.function
    def ten_to_predictions_tf(self, 
                           parameters_dict,
                           ):
        r"""
        10^predictions given tensor of input parameters,
        fully implemented in TensorFlow. It raises 10 to the output
        of ``predictions_tf``

        Parameters:
            parameters (Tensor):
                input parameters

        Returns:
            Tensor:
               10^output predictions
        """
        return tf.pow(10., self.predictions_tf(parameters_dict))


    # save network parameters to Numpy arrays
    def update_emulator_parameters(self):
        r"""
        Update emulator parameters before saving them
        """

        # put network parameters to numpy arrays
        self.W_ = [self.W[i].numpy() for i in range(self.n_layers)]
        self.b_ = [self.b[i].numpy() for i in range(self.n_layers)]
        self.alphas_ = [self.alphas[i].numpy() for i in range(self.n_layers-1)]
        self.betas_ = [self.betas[i].numpy() for i in range(self.n_layers-1)]

        # put shift and scale parameters to numpy arrays
        self.parameters_mean_ = self.parameters_mean.numpy()
        self.parameters_std_ = self.parameters_std.numpy()
        self.pca_mean_ = self.pca_mean.numpy()
        self.pca_std_ = self.pca_std.numpy()
        self.features_mean_ = self.features_mean.numpy()
        self.features_std_ = self.features_std.numpy()

        # pca transform matrix
        self.pca_transform_matrix_ = self.pca_transform_matrix.numpy()


    # save
    def save(self, 
             filename,
             ):
        r"""
        Save network parameters

        Parameters:
            filename (str):
                filename tag (without suffix) where model will be saved
        """
        # attributes
        attributes = [self.W_, 
                      self.b_, 
                      self.alphas_, 
                      self.betas_, 
                      self.parameters_mean_, 
                      self.parameters_std_,
                      self.pca_mean_,
                      self.pca_std_,
                      self.features_mean_,
                      self.features_std_,
                      self.parameters,
                      self.n_parameters,
                      self.modes,
                      self.n_modes,
                      self.n_pcas,
                      self.pca_transform_matrix_,
                      self.n_hidden,
                      self.n_layers,
                      self.architecture,
                      ]

        # save attributes to file
        f = open(filename + ".pkl", 'wb')
        pickle.dump(attributes, f)
        f.close()


    # restore attributes
    def restore(self, 
                filename,
                ):
        r"""
        Load pre-trained model

        Parameters:
            filename (str):
                filename tag (without suffix) where model was saved
        """
        # load attributes
        f = open(filename + ".pkl", 'rb')
        self.W_, self.b_, self.alphas_, self.betas_, \
        self.parameters_mean_, self.parameters_std_, \
        self.pca_mean_, self.pca_std_, \
        self.features_mean_, self.features_std_, \
        self.parameters, self.n_parameters, \
        self.modes, self.n_modes, \
        self.n_pcas, self.pca_transform_matrix_, \
        self.n_hidden, self.n_layers, self.architecture = pickle.load(f)
        f.close()


    # auxiliary function to sort input parameters
    def dict_to_ordered_arr_np(self, 
                               input_dict, 
                               ):
        r"""
        Sort input parameters

        Parameters:
            input_dict (dict [numpy.ndarray]):
                input dict of (arrays of) parameters to be sorted

        Returns:
            numpy.ndarray:
                parameters sorted according to desired order
        """
        if self.parameters is not None:
            return np.stack([input_dict[k] for k in self.parameters], axis=1)
        else:
            return np.stack([input_dict[k] for k in input_dict], axis=1)


    # forward prediction given input parameters implemented in Numpy
    def forward_pass_np(self, 
                        parameters_arr,
                        ):
        r"""
        Forward pass through the network to predict the output, 
        fully implemented in Numpy

        Parameters:
            parameters_arr (numpy.ndarray):
                array of input parameters

        Returns:
          numpy.ndarray:
            output predictions
        """
        # forward pass through the network
        act = []
        layers = [(parameters_arr - self.parameters_mean_)/self.parameters_std_]
        for i in range(self.n_layers-1):

            # linear network operation
            act.append(np.dot(layers[-1], self.W_[i]) + self.b_[i])

            # pass through activation function
            layers.append((self.betas_[i] + (1.-self.betas_[i])*1./(1.+np.exp(-self.alphas_[i]*act[-1])))*act[-1])

        # final (linear) layer -> (normalized) PCA coefficients
        layers.append(np.dot(layers[-1], self.W_[-1]) + self.b_[-1])

        # rescale PCA coefficients, multiply out PCA basis -> normalised (log)-spectrum, shift and re-scale (log)-spectrum -> output (log)-spectrum
        return np.dot(layers[-1]*self.pca_std_ + self.pca_mean_, self.pca_transform_matrix_)*self.features_std_ + self.features_mean_


    def predictions_np(self, 
                       parameters_dict,
                       ):
        r"""
        Predictions given input parameters collected in a dict.
        Fully implemented in Numpy. Calls ``forward_pass_np``
        after ordering the input parameter dict

        Parameters:
            parameters_dict (dict [numpy.ndarray]):
                dictionary of (arrays of) parameters

        Returns:
            numpy.ndarray:
                output predictions
        """
        parameters_arr = self.dict_to_ordered_arr_np(parameters_dict)
        return self.forward_pass_np(parameters_arr)


    # 10.**predictions
    def ten_to_predictions_np(self,
                              parameters_dict,
                              ):
        r"""
        10^predictions given input parameters collected in a dict.
        Fully implemented in Numpy. It raises 10 to the output
        from ``forward_pass_np``

        Parameters:
            parameters_dict (dict [numpy.ndarray]):
                dictionary of (arrays of) parameters

        Returns:
            numpy.ndarray:
                10^output predictions
        """
        return 10.**self.predictions_np(parameters_dict)



    ### Infrastructure for network training ###

    @tf.function
    def compute_loss(self, 
                     training_parameters,
                     training_pca, 
                     ):
        r"""
        Mean squared difference

        Parameters:
            training_parameters (Tensor):
                input parameters
            training_pca (Tensor):
                true PCA components

        Returns:
            Tensor:
                mean squared difference
        """
        return tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.forward_pass_tf(training_parameters), training_pca)))


    @tf.function
    def compute_loss_and_gradients(self, 
                                   training_parameters,
                                   training_pca, 
                                   ):
        r"""
        Compute mean squared difference and gradients

        Parameters:
            training_parameters (Tensor):
                input parameters
            training_pca (Tensor):
                true PCA components

        Returns:
            loss (Tensor):
                mean squared difference
            gradients (Tensor):
                gradients
        """
        # compute loss on the tape
        with tf.GradientTape() as tape:

          # loss
          loss = tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.forward_pass_tf(training_parameters), training_pca))) 

        # compute gradients
        gradients = tape.gradient(loss, self.trainable_variables)

        return loss, gradients


    def training_step(self, 
                      training_parameters,
                      training_pca, 
                      ):
        r"""
        Optimizes loss

        Parameters:
            training_parameters (Tensor):
                input parameters
            training_pca (Tensor):
                true PCA components

        Returns:
            loss (Tensor):
                mean squared difference
        """
        # compute loss and gradients
        loss, gradients = self.compute_loss_and_gradients(training_parameters, training_pca)

        # apply gradients
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        return loss


    def training_step_with_accumulated_gradients(self, 
                                                 training_parameters, 
                                                 training_pca, 
                                                 accumulation_steps=10,
                                                 ):
        r"""
        Optimize loss breaking calculation into accumulated gradients

        Parameters:
            training_parameters (Tensor):
                tensor of input parameters for the network
            training_pca (Tensor):
                tensor of true PCA components
            accumulation_steps (int):
                number of accumulated gradients

        Returns:
            accumulated_loss (Tensor):
                mean squared difference
        """
        # create dataset to do sub-calculations over
        dataset = tf.data.Dataset.from_tensor_slices((training_parameters, training_pca)).batch(int(training_pca.shape[0]/accumulation_steps))

        # initialize gradients and loss (to zero)
        accumulated_gradients = [tf.Variable(tf.zeros_like(variable), trainable=False) for variable in self.trainable_variables]
        accumulated_loss = tf.Variable(0., trainable=False)

        # loop over sub-batches
        for training_parameters_, training_pca_ in dataset:

            # calculate loss and gradients
            loss, gradients = self.compute_loss_and_gradients_pca(training_parameters_, training_pca_,)

            # update the accumulated gradients and loss
            for i in range(len(accumulated_gradients)):
                accumulated_gradients[i].assign_add(gradients[i]*training_pca_.shape[0]/training_pca.shape[0])
            accumulated_loss.assign_add(loss*training_pca_.shape[0]/training_pca.shape[0])

            # apply accumulated gradients
            self.optimizer.apply_gradients(zip(accumulated_gradients, self.trainable_variables))

        return accumulated_loss


# ==========================================
#         main TRAINING function
# ==========================================
    def train(self,
              filename_saved_model,
              # cooling schedule
              validation_split=0.1,
              learning_rates=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6],
              batch_sizes=[1024, 1024, 1024, 1024, 1024],
              gradient_accumulation_steps = [1, 1, 1, 1, 1],
              # early stopping set up
              patience_values = [100,100,100,100,100],
              max_epochs = [1000,1000,1000,1000,1000],
             ):
        r"""
        Train the model

        Parameters:
            filename_saved_model (str):
                filename tag where model will be saved
            validation_split (float):
                percentage of training data used for validation
            learning_rates (list [float]):
                learning rates for each step of learning schedule
            batch_sizes (list [int]):
                batch sizes for each step of learning schedule
            gradient_accumulation_steps (list [int]):
                batches for gradient accumulations for each step of learning schedule
            patience_values (list [int]):
                early stopping patience for each step of learning schedule
            max_epochs (list [int]):
                maximum number of epochs for each step of learning schedule
        """
        # check correct number of steps
        assert len(learning_rates)==len(batch_sizes)\
               ==len(gradient_accumulation_steps)==len(patience_values)==len(max_epochs), \
               'Number of learning rates, batch sizes, gradient accumulation steps, patience values and max epochs are not matching!'

        # training start info, if verbose
        if self.verbose:
            multiline_str = "Starting cosmopower_PCAplusNN training, \n" \
                            f"using {int(100*validation_split)} per cent of training samples for validation. \n" \
                            f"Performing {len(learning_rates)} learning steps, with \n" \
                            f"{list(learning_rates)} learning rates \n" \
                            f"{list(batch_sizes)} batch sizes \n" \
                            f"{list(gradient_accumulation_steps)} gradient accumulation steps \n" \
                            f"{list(patience_values)} patience values \n" \
                            f"{list(max_epochs)} max epochs \n"
            print(multiline_str)

        # casting
        training_parameters = tf.convert_to_tensor(self.cp_pca.training_parameters, dtype=dtype)
        training_pca = tf.convert_to_tensor(self.cp_pca.training_pca, dtype=dtype)

        # training/validation split
        n_validation = int(training_parameters.shape[0] * validation_split)
        n_training = training_parameters.shape[0] - n_validation

        # train using cooling/heating schedule for lr/batch-size
        for i in range(len(learning_rates)):

            print('learning rate = ' + str(learning_rates[i]) + ', batch size = ' + str(batch_sizes[i]))

            # set learning rate
            self.optimizer.lr = learning_rates[i]

            # split into validation and training sub-sets
            training_selection = tf.random.shuffle([True] * n_training + [False] * n_validation)

            # create iterable dataset (given batch size)
            training_data = tf.data.Dataset.from_tensor_slices((training_parameters[training_selection], training_pca[training_selection])).shuffle(n_training).batch(batch_sizes[i])

            # set up training loss
            training_loss = [np.infty]
            validation_loss = [np.infty]
            best_loss = np.infty
            early_stopping_counter = 0

            # loop over epochs
            with trange(max_epochs[i]) as t:
                for epoch in t:
                    # loop over batches
                    for theta, pca in training_data:

                        # training step: check whether to accumulate gradients or not (only worth doing this for very large batch sizes)
                        if gradient_accumulation_steps[i] == 1:
                            loss = self.training_step(theta, pca)
                        else:
                            loss = self.training_step_with_accumulated_gradients(theta, pca, accumulation_steps=gradient_accumulation_steps[i])

                    # compute validation loss at the end of the epoch
                    validation_loss.append(self.compute_loss(training_parameters[~training_selection], training_pca[~training_selection]).numpy())

                    # update the progressbar
                    t.set_postfix(loss=validation_loss[-1])

                    # early stopping condition
                    if validation_loss[-1] < best_loss:
                        best_loss = validation_loss[-1]
                        early_stopping_counter = 0
                    else:
                        early_stopping_counter += 1
                    if early_stopping_counter >= patience_values[i]:
                        self.update_emulator_parameters()
                        self.save(filename_saved_model)
                        print('Validation loss = ' + str(best_loss))
                        print('Model saved.')
                        break
                self.update_emulator_parameters()
                self.save(filename_saved_model)
                print('Reached max number of epochs. Validation loss = ' + str(best_loss))
                print('Model saved.')

__init__(self, cp_pca=None, n_hidden=[512, 512, 512], restore=False, restore_filename=None, trainable=True, optimizer=<keras.optimizer_v2.adam.Adam object at 0x7f9fb895ad10>, verbose=False) special

Constructor.

Source code in cosmopower/cosmopower_PCAplusNN.py
def __init__(self, 
             cp_pca=None,
             n_hidden=[512,512,512], 
             restore=False, 
             restore_filename=None, 
             trainable=True, 
             optimizer=tf.keras.optimizers.Adam(),
             verbose=False,
             ):
    r"""
    Constructor.
    """
    # super
    super(cosmopower_PCAplusNN, self).__init__()

    # restore
    if restore is True:
        self.restore(restore_filename)

    # else set variables from input arguments
    else:
        # PCA compression, previously computed
        self.cp_pca = cp_pca

        # parameters
        self.parameters = self.cp_pca.parameters
        self.n_parameters = len(self.parameters)
        self.pca_transform_matrix_ = self.cp_pca.pca_transform_matrix
        self.modes = self.cp_pca.modes
        self.n_modes = self.cp_pca.n_modes
        self.n_pcas = self.pca_transform_matrix_.shape[0]
        self.n_hidden = n_hidden

        # architecture
        self.architecture = [self.n_parameters] + self.n_hidden + [self.n_pcas]
        self.n_layers = len(self.architecture) - 1

        # standardisation

        # input parameters mean and std
        self.parameters_mean_ = self.cp_pca.parameters_mean
        self.parameters_std_ = self.cp_pca.parameters_std

        # PCA mean and std
        self.pca_mean_ = self.cp_pca.pca_mean
        self.pca_std_ = self.cp_pca.pca_std

        # spectra mean and std
        self.features_mean_ = self.cp_pca.features_mean
        self.features_std_ = self.cp_pca.features_std

    # input parameters mean and std
    self.parameters_mean = tf.constant(self.parameters_mean_, dtype=dtype, name='parameters_mean')
    self.parameters_std = tf.constant(self.parameters_std_, dtype=dtype, name='parameters_std')

    # PCA mean and std
    self.pca_mean = tf.constant(self.pca_mean_, dtype=dtype, name='pca_mean')
    self.pca_std = tf.constant(self.pca_std_, dtype=dtype, name='pca_std')

    # (log)-spectra mean and std
    self.features_mean = tf.constant(self.features_mean_, dtype=dtype, name='features_mean')
    self.features_std = tf.constant(self.features_std_, dtype=dtype, name='features_std')

    # pca transform matrix
    self.pca_transform_matrix = tf.constant(self.pca_transform_matrix_, dtype=dtype, name='pca_transform_matrix')

    # weights, biases and activation function parameters for each layer of the network
    self.W = []
    self.b = []
    self.alphas = []
    self.betas = [] 
    for i in range(self.n_layers):
        self.W.append(tf.Variable(tf.random.normal([self.architecture[i], self.architecture[i+1]], 0., np.sqrt(2./self.n_parameters)), name="W_" + str(i), trainable=trainable))
        self.b.append(tf.Variable(tf.zeros([self.architecture[i+1]]), name = "b_" + str(i), trainable=trainable))
    for i in range(self.n_layers-1):
        self.alphas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "alphas_" + str(i), trainable=trainable))
        self.betas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "betas_" + str(i), trainable=trainable))

    # restore weights if restore = True
    if restore is True:
        for i in range(self.n_layers):
            self.W[i].assign(self.W_[i])
            self.b[i].assign(self.b_[i])
        for i in range(self.n_layers-1):
            self.alphas[i].assign(self.alphas_[i])
            self.betas[i].assign(self.betas_[i])

    self.optimizer = optimizer
    self.verbose= verbose

    # print initialization info, if verbose
    if self.verbose:
        multiline_str = "\nInitialized cosmopower_PCAplusNN model, \n" \
                        f"mapping {self.n_parameters} input parameters to {self.n_pcas} PCA components \n" \
                        f"and then inverting the PCA compression to obtain {self.n_modes} modes \n" \
                        f"The model uses {len(self.n_hidden)} hidden layers, \n" \
                        f"with {list(self.n_hidden)} nodes, respectively. \n"
        print(multiline_str)

activation(self, x, alpha, beta)

Non-linear activation function

Parameters:

Name Type Description Default
x Tensor

linear output from previous layer

required
alpha Tensor

trainable parameter

required
beta Tensor

trainable parameter

required

Returns:

Type Description
Tensor

the result of applying the non-linear activation function to the linear output of the layer

Source code in cosmopower/cosmopower_PCAplusNN.py
def activation(self,
               x,
               alpha,
               beta,
               ):
    r"""
    Non-linear activation function

    Parameters:
        x (Tensor):
            linear output from previous layer
        alpha (Tensor):
            trainable parameter
        beta (Tensor):
            trainable parameter

    Returns:
        Tensor:
            the result of applying the non-linear activation function to the linear output of the layer
    """
    return tf.multiply(tf.add(beta, tf.multiply(tf.sigmoid(tf.multiply(alpha, x)), tf.subtract(1.0, beta)) ), x)

compute_loss(self, training_parameters, training_pca)

Mean squared difference

Parameters:

Name Type Description Default
training_parameters Tensor

input parameters

required
training_pca Tensor

true PCA components

required

Returns:

Type Description
Tensor

mean squared difference

Source code in cosmopower/cosmopower_PCAplusNN.py
@tf.function
def compute_loss(self, 
                 training_parameters,
                 training_pca, 
                 ):
    r"""
    Mean squared difference

    Parameters:
        training_parameters (Tensor):
            input parameters
        training_pca (Tensor):
            true PCA components

    Returns:
        Tensor:
            mean squared difference
    """
    return tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.forward_pass_tf(training_parameters), training_pca)))

compute_loss_and_gradients(self, training_parameters, training_pca)

Compute mean squared difference and gradients

Parameters:

Name Type Description Default
training_parameters Tensor

input parameters

required
training_pca Tensor

true PCA components

required

Returns:

Type Description
loss (Tensor)

mean squared difference gradients (Tensor): gradients

Source code in cosmopower/cosmopower_PCAplusNN.py
@tf.function
def compute_loss_and_gradients(self, 
                               training_parameters,
                               training_pca, 
                               ):
    r"""
    Compute mean squared difference and gradients

    Parameters:
        training_parameters (Tensor):
            input parameters
        training_pca (Tensor):
            true PCA components

    Returns:
        loss (Tensor):
            mean squared difference
        gradients (Tensor):
            gradients
    """
    # compute loss on the tape
    with tf.GradientTape() as tape:

      # loss
      loss = tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.forward_pass_tf(training_parameters), training_pca))) 

    # compute gradients
    gradients = tape.gradient(loss, self.trainable_variables)

    return loss, gradients

dict_to_ordered_arr_np(self, input_dict)

Sort input parameters

Parameters:

Name Type Description Default
input_dict dict [numpy.ndarray]

input dict of (arrays of) parameters to be sorted

required

Returns:

Type Description
numpy.ndarray

parameters sorted according to desired order

Source code in cosmopower/cosmopower_PCAplusNN.py
def dict_to_ordered_arr_np(self, 
                           input_dict, 
                           ):
    r"""
    Sort input parameters

    Parameters:
        input_dict (dict [numpy.ndarray]):
            input dict of (arrays of) parameters to be sorted

    Returns:
        numpy.ndarray:
            parameters sorted according to desired order
    """
    if self.parameters is not None:
        return np.stack([input_dict[k] for k in self.parameters], axis=1)
    else:
        return np.stack([input_dict[k] for k in input_dict], axis=1)

forward_pass_np(self, parameters_arr)

Forward pass through the network to predict the output, fully implemented in Numpy

Parameters:

Name Type Description Default
parameters_arr numpy.ndarray

array of input parameters

required

Returns:

Type Description
numpy.ndarray

output predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
def forward_pass_np(self, 
                    parameters_arr,
                    ):
    r"""
    Forward pass through the network to predict the output, 
    fully implemented in Numpy

    Parameters:
        parameters_arr (numpy.ndarray):
            array of input parameters

    Returns:
      numpy.ndarray:
        output predictions
    """
    # forward pass through the network
    act = []
    layers = [(parameters_arr - self.parameters_mean_)/self.parameters_std_]
    for i in range(self.n_layers-1):

        # linear network operation
        act.append(np.dot(layers[-1], self.W_[i]) + self.b_[i])

        # pass through activation function
        layers.append((self.betas_[i] + (1.-self.betas_[i])*1./(1.+np.exp(-self.alphas_[i]*act[-1])))*act[-1])

    # final (linear) layer -> (normalized) PCA coefficients
    layers.append(np.dot(layers[-1], self.W_[-1]) + self.b_[-1])

    # rescale PCA coefficients, multiply out PCA basis -> normalised (log)-spectrum, shift and re-scale (log)-spectrum -> output (log)-spectrum
    return np.dot(layers[-1]*self.pca_std_ + self.pca_mean_, self.pca_transform_matrix_)*self.features_std_ + self.features_mean_

forward_pass_tf(self, parameters_tensor)

Forward pass through the network to predict the PCA coefficients, fully implemented in TensorFlow

Parameters:

Name Type Description Default
parameters_tensor Tensor

tensor of shape (number_of_cosmologies, number_of_cosmological_parameters)

required

Returns:

Type Description
Tensor

PCA predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
def forward_pass_tf(self, 
                    parameters_tensor,
                    ):
    r"""
    Forward pass through the network to predict the PCA coefficients,
    fully implemented in TensorFlow

    Parameters:
        parameters_tensor (Tensor):
            tensor of shape (number_of_cosmologies, number_of_cosmological_parameters)

    Returns:
        Tensor:
            PCA predictions
    """
    outputs = []
    layers = [tf.divide(tf.subtract(parameters_tensor, self.parameters_mean), self.parameters_std)]
    for i in range(self.n_layers - 1):

        # linear network operation
        outputs.append(tf.add(tf.matmul(layers[-1], self.W[i]), self.b[i]))

        # non-linear activation function
        layers.append(self.activation(outputs[-1], self.alphas[i], self.betas[i]))

    # linear output layer
    layers.append(tf.add(tf.matmul(layers[-1], self.W[-1]), self.b[-1]))

    # rescale the output (predicted PCA coefficients) and return
    return tf.add(tf.multiply(layers[-1], self.pca_std), self.pca_mean)

predictions_np(self, parameters_dict)

Predictions given input parameters collected in a dict. Fully implemented in Numpy. Calls forward_pass_np after ordering the input parameter dict

Parameters:

Name Type Description Default
parameters_dict dict [numpy.ndarray]

dictionary of (arrays of) parameters

required

Returns:

Type Description
numpy.ndarray

output predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
def predictions_np(self, 
                   parameters_dict,
                   ):
    r"""
    Predictions given input parameters collected in a dict.
    Fully implemented in Numpy. Calls ``forward_pass_np``
    after ordering the input parameter dict

    Parameters:
        parameters_dict (dict [numpy.ndarray]):
            dictionary of (arrays of) parameters

    Returns:
        numpy.ndarray:
            output predictions
    """
    parameters_arr = self.dict_to_ordered_arr_np(parameters_dict)
    return self.forward_pass_np(parameters_arr)

predictions_tf(self, parameters_tensor)

Predictions given tensor of input parameters, fully implemented in TensorFlow. Calls forward_pass_tf and inverts PCA

Parameters:

Name Type Description Default
parameters_tensor Tensor

input parameters

required

Returns:

Type Description
Tensor

output predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
@tf.function
def predictions_tf(self, 
                   parameters_tensor,
                   ):
    r"""
    Predictions given tensor of input parameters,
    fully implemented in TensorFlow. Calls ``forward_pass_tf``
    and inverts PCA

    Parameters:
        parameters_tensor (Tensor):
            input parameters

    Returns:
        Tensor:
            output predictions
    """
    # pass through network to compute PCA coefficients
    pca_coefficients = self.forward_pass_tf(parameters_tensor)

    # transform from PCA to normalized spectrum basis; shift and re-scale normalised (log)-spectrum -> (log)-spectrum
    return tf.add(tf.multiply(tf.matmul(pca_coefficients, self.pca_transform_matrix), self.features_std), self.features_mean)

restore(self, filename)

Load pre-trained model

Parameters:

Name Type Description Default
filename str

filename tag (without suffix) where model was saved

required
Source code in cosmopower/cosmopower_PCAplusNN.py
def restore(self, 
            filename,
            ):
    r"""
    Load pre-trained model

    Parameters:
        filename (str):
            filename tag (without suffix) where model was saved
    """
    # load attributes
    f = open(filename + ".pkl", 'rb')
    self.W_, self.b_, self.alphas_, self.betas_, \
    self.parameters_mean_, self.parameters_std_, \
    self.pca_mean_, self.pca_std_, \
    self.features_mean_, self.features_std_, \
    self.parameters, self.n_parameters, \
    self.modes, self.n_modes, \
    self.n_pcas, self.pca_transform_matrix_, \
    self.n_hidden, self.n_layers, self.architecture = pickle.load(f)
    f.close()

save(self, filename)

Save network parameters

Parameters:

Name Type Description Default
filename str

filename tag (without suffix) where model will be saved

required
Source code in cosmopower/cosmopower_PCAplusNN.py
def save(self, 
         filename,
         ):
    r"""
    Save network parameters

    Parameters:
        filename (str):
            filename tag (without suffix) where model will be saved
    """
    # attributes
    attributes = [self.W_, 
                  self.b_, 
                  self.alphas_, 
                  self.betas_, 
                  self.parameters_mean_, 
                  self.parameters_std_,
                  self.pca_mean_,
                  self.pca_std_,
                  self.features_mean_,
                  self.features_std_,
                  self.parameters,
                  self.n_parameters,
                  self.modes,
                  self.n_modes,
                  self.n_pcas,
                  self.pca_transform_matrix_,
                  self.n_hidden,
                  self.n_layers,
                  self.architecture,
                  ]

    # save attributes to file
    f = open(filename + ".pkl", 'wb')
    pickle.dump(attributes, f)
    f.close()

ten_to_predictions_np(self, parameters_dict)

10^predictions given input parameters collected in a dict. Fully implemented in Numpy. It raises 10 to the output from forward_pass_np

Parameters:

Name Type Description Default
parameters_dict dict [numpy.ndarray]

dictionary of (arrays of) parameters

required

Returns:

Type Description
numpy.ndarray

10^output predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
def ten_to_predictions_np(self,
                          parameters_dict,
                          ):
    r"""
    10^predictions given input parameters collected in a dict.
    Fully implemented in Numpy. It raises 10 to the output
    from ``forward_pass_np``

    Parameters:
        parameters_dict (dict [numpy.ndarray]):
            dictionary of (arrays of) parameters

    Returns:
        numpy.ndarray:
            10^output predictions
    """
    return 10.**self.predictions_np(parameters_dict)

ten_to_predictions_tf(self, parameters_dict)

10^predictions given tensor of input parameters, fully implemented in TensorFlow. It raises 10 to the output of predictions_tf

Parameters:

Name Type Description Default
parameters Tensor

input parameters

required

Returns:

Type Description
Tensor

10^output predictions

Source code in cosmopower/cosmopower_PCAplusNN.py
@tf.function
def ten_to_predictions_tf(self, 
                       parameters_dict,
                       ):
    r"""
    10^predictions given tensor of input parameters,
    fully implemented in TensorFlow. It raises 10 to the output
    of ``predictions_tf``

    Parameters:
        parameters (Tensor):
            input parameters

    Returns:
        Tensor:
           10^output predictions
    """
    return tf.pow(10., self.predictions_tf(parameters_dict))

train(self, filename_saved_model, validation_split=0.1, learning_rates=[0.01, 0.001, 0.0001, 1e-05, 1e-06], batch_sizes=[1024, 1024, 1024, 1024, 1024], gradient_accumulation_steps=[1, 1, 1, 1, 1], patience_values=[100, 100, 100, 100, 100], max_epochs=[1000, 1000, 1000, 1000, 1000])

Train the model

Parameters:

Name Type Description Default
filename_saved_model str

filename tag where model will be saved

required
validation_split float

percentage of training data used for validation

0.1
learning_rates list [float]

learning rates for each step of learning schedule

[0.01, 0.001, 0.0001, 1e-05, 1e-06]
batch_sizes list [int]

batch sizes for each step of learning schedule

[1024, 1024, 1024, 1024, 1024]
gradient_accumulation_steps list [int]

batches for gradient accumulations for each step of learning schedule

[1, 1, 1, 1, 1]
patience_values list [int]

early stopping patience for each step of learning schedule

[100, 100, 100, 100, 100]
max_epochs list [int]

maximum number of epochs for each step of learning schedule

[1000, 1000, 1000, 1000, 1000]
Source code in cosmopower/cosmopower_PCAplusNN.py
def train(self,
          filename_saved_model,
          # cooling schedule
          validation_split=0.1,
          learning_rates=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6],
          batch_sizes=[1024, 1024, 1024, 1024, 1024],
          gradient_accumulation_steps = [1, 1, 1, 1, 1],
          # early stopping set up
          patience_values = [100,100,100,100,100],
          max_epochs = [1000,1000,1000,1000,1000],
         ):
    r"""
    Train the model

    Parameters:
        filename_saved_model (str):
            filename tag where model will be saved
        validation_split (float):
            percentage of training data used for validation
        learning_rates (list [float]):
            learning rates for each step of learning schedule
        batch_sizes (list [int]):
            batch sizes for each step of learning schedule
        gradient_accumulation_steps (list [int]):
            batches for gradient accumulations for each step of learning schedule
        patience_values (list [int]):
            early stopping patience for each step of learning schedule
        max_epochs (list [int]):
            maximum number of epochs for each step of learning schedule
    """
    # check correct number of steps
    assert len(learning_rates)==len(batch_sizes)\
           ==len(gradient_accumulation_steps)==len(patience_values)==len(max_epochs), \
           'Number of learning rates, batch sizes, gradient accumulation steps, patience values and max epochs are not matching!'

    # training start info, if verbose
    if self.verbose:
        multiline_str = "Starting cosmopower_PCAplusNN training, \n" \
                        f"using {int(100*validation_split)} per cent of training samples for validation. \n" \
                        f"Performing {len(learning_rates)} learning steps, with \n" \
                        f"{list(learning_rates)} learning rates \n" \
                        f"{list(batch_sizes)} batch sizes \n" \
                        f"{list(gradient_accumulation_steps)} gradient accumulation steps \n" \
                        f"{list(patience_values)} patience values \n" \
                        f"{list(max_epochs)} max epochs \n"
        print(multiline_str)

    # casting
    training_parameters = tf.convert_to_tensor(self.cp_pca.training_parameters, dtype=dtype)
    training_pca = tf.convert_to_tensor(self.cp_pca.training_pca, dtype=dtype)

    # training/validation split
    n_validation = int(training_parameters.shape[0] * validation_split)
    n_training = training_parameters.shape[0] - n_validation

    # train using cooling/heating schedule for lr/batch-size
    for i in range(len(learning_rates)):

        print('learning rate = ' + str(learning_rates[i]) + ', batch size = ' + str(batch_sizes[i]))

        # set learning rate
        self.optimizer.lr = learning_rates[i]

        # split into validation and training sub-sets
        training_selection = tf.random.shuffle([True] * n_training + [False] * n_validation)

        # create iterable dataset (given batch size)
        training_data = tf.data.Dataset.from_tensor_slices((training_parameters[training_selection], training_pca[training_selection])).shuffle(n_training).batch(batch_sizes[i])

        # set up training loss
        training_loss = [np.infty]
        validation_loss = [np.infty]
        best_loss = np.infty
        early_stopping_counter = 0

        # loop over epochs
        with trange(max_epochs[i]) as t:
            for epoch in t:
                # loop over batches
                for theta, pca in training_data:

                    # training step: check whether to accumulate gradients or not (only worth doing this for very large batch sizes)
                    if gradient_accumulation_steps[i] == 1:
                        loss = self.training_step(theta, pca)
                    else:
                        loss = self.training_step_with_accumulated_gradients(theta, pca, accumulation_steps=gradient_accumulation_steps[i])

                # compute validation loss at the end of the epoch
                validation_loss.append(self.compute_loss(training_parameters[~training_selection], training_pca[~training_selection]).numpy())

                # update the progressbar
                t.set_postfix(loss=validation_loss[-1])

                # early stopping condition
                if validation_loss[-1] < best_loss:
                    best_loss = validation_loss[-1]
                    early_stopping_counter = 0
                else:
                    early_stopping_counter += 1
                if early_stopping_counter >= patience_values[i]:
                    self.update_emulator_parameters()
                    self.save(filename_saved_model)
                    print('Validation loss = ' + str(best_loss))
                    print('Model saved.')
                    break
            self.update_emulator_parameters()
            self.save(filename_saved_model)
            print('Reached max number of epochs. Validation loss = ' + str(best_loss))
            print('Model saved.')

training_step(self, training_parameters, training_pca)

Optimizes loss

Parameters:

Name Type Description Default
training_parameters Tensor

input parameters

required
training_pca Tensor

true PCA components

required

Returns:

Type Description
loss (Tensor)

mean squared difference

Source code in cosmopower/cosmopower_PCAplusNN.py
def training_step(self, 
                  training_parameters,
                  training_pca, 
                  ):
    r"""
    Optimizes loss

    Parameters:
        training_parameters (Tensor):
            input parameters
        training_pca (Tensor):
            true PCA components

    Returns:
        loss (Tensor):
            mean squared difference
    """
    # compute loss and gradients
    loss, gradients = self.compute_loss_and_gradients(training_parameters, training_pca)

    # apply gradients
    self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

    return loss

training_step_with_accumulated_gradients(self, training_parameters, training_pca, accumulation_steps=10)

Optimize loss breaking calculation into accumulated gradients

Parameters:

Name Type Description Default
training_parameters Tensor

tensor of input parameters for the network

required
training_pca Tensor

tensor of true PCA components

required
accumulation_steps int

number of accumulated gradients

10

Returns:

Type Description
accumulated_loss (Tensor)

mean squared difference

Source code in cosmopower/cosmopower_PCAplusNN.py
def training_step_with_accumulated_gradients(self, 
                                             training_parameters, 
                                             training_pca, 
                                             accumulation_steps=10,
                                             ):
    r"""
    Optimize loss breaking calculation into accumulated gradients

    Parameters:
        training_parameters (Tensor):
            tensor of input parameters for the network
        training_pca (Tensor):
            tensor of true PCA components
        accumulation_steps (int):
            number of accumulated gradients

    Returns:
        accumulated_loss (Tensor):
            mean squared difference
    """
    # create dataset to do sub-calculations over
    dataset = tf.data.Dataset.from_tensor_slices((training_parameters, training_pca)).batch(int(training_pca.shape[0]/accumulation_steps))

    # initialize gradients and loss (to zero)
    accumulated_gradients = [tf.Variable(tf.zeros_like(variable), trainable=False) for variable in self.trainable_variables]
    accumulated_loss = tf.Variable(0., trainable=False)

    # loop over sub-batches
    for training_parameters_, training_pca_ in dataset:

        # calculate loss and gradients
        loss, gradients = self.compute_loss_and_gradients_pca(training_parameters_, training_pca_,)

        # update the accumulated gradients and loss
        for i in range(len(accumulated_gradients)):
            accumulated_gradients[i].assign_add(gradients[i]*training_pca_.shape[0]/training_pca.shape[0])
        accumulated_loss.assign_add(loss*training_pca_.shape[0]/training_pca.shape[0])

        # apply accumulated gradients
        self.optimizer.apply_gradients(zip(accumulated_gradients, self.trainable_variables))

    return accumulated_loss

update_emulator_parameters(self)

Update emulator parameters before saving them

Source code in cosmopower/cosmopower_PCAplusNN.py
def update_emulator_parameters(self):
    r"""
    Update emulator parameters before saving them
    """

    # put network parameters to numpy arrays
    self.W_ = [self.W[i].numpy() for i in range(self.n_layers)]
    self.b_ = [self.b[i].numpy() for i in range(self.n_layers)]
    self.alphas_ = [self.alphas[i].numpy() for i in range(self.n_layers-1)]
    self.betas_ = [self.betas[i].numpy() for i in range(self.n_layers-1)]

    # put shift and scale parameters to numpy arrays
    self.parameters_mean_ = self.parameters_mean.numpy()
    self.parameters_std_ = self.parameters_std.numpy()
    self.pca_mean_ = self.pca_mean.numpy()
    self.pca_std_ = self.pca_std.numpy()
    self.features_mean_ = self.features_mean.numpy()
    self.features_std_ = self.features_std.numpy()

    # pca transform matrix
    self.pca_transform_matrix_ = self.pca_transform_matrix.numpy()