cosmopower_NN module¶
cosmopower_NN (Model)
¶
Mapping between cosmological parameters and (log)-power spectra
Attributes:
Name | Type | Description |
---|---|---|
parameters |
list [str] |
model parameters, sorted in the desired order |
modes |
numpy.ndarray |
multipoles or k-values in the (log)-spectra |
parameters_mean |
numpy.ndarray |
mean of input parameters |
parameters_std |
numpy.ndarray |
std of input parameters |
features_mean |
numpy.ndarray |
mean of output features |
features_std |
numpy.ndarray |
std of output features |
n_hidden |
list [int] |
number of nodes for each hidden layer |
restore |
bool |
whether to restore a previously trained model or not |
restore_filename |
str |
filename tag (without suffix) for restoring trained model from file (this will be a pickle file with all of the model attributes and weights) |
trainable |
bool |
training layers |
optimizer |
tf.keras.optimizers |
optimizer for training |
verbose |
bool |
whether to print messages at intermediate steps or not |
Source code in cosmopower/cosmopower_NN.py
class cosmopower_NN(tf.keras.Model):
r"""
Mapping between cosmological parameters and (log)-power spectra
Attributes:
parameters (list [str]):
model parameters, sorted in the desired order
modes (numpy.ndarray):
multipoles or k-values in the (log)-spectra
parameters_mean (numpy.ndarray):
mean of input parameters
parameters_std (numpy.ndarray):
std of input parameters
features_mean (numpy.ndarray):
mean of output features
features_std (numpy.ndarray):
std of output features
n_hidden (list [int]):
number of nodes for each hidden layer
restore (bool):
whether to restore a previously trained model or not
restore_filename (str):
filename tag (without suffix) for restoring trained model from file
(this will be a pickle file with all of the model attributes and weights)
trainable (bool):
training layers
optimizer (tf.keras.optimizers):
optimizer for training
verbose (bool):
whether to print messages at intermediate steps or not
"""
def __init__(self,
parameters=None,
modes=None,
parameters_mean=None,
parameters_std=None,
features_mean=None,
features_std=None,
n_hidden=[512,512,512],
restore=False,
restore_filename=None,
trainable=True,
optimizer=tf.keras.optimizers.Adam(),
verbose=False,
):
"""
Constructor
"""
# super
super(cosmopower_NN, self).__init__()
# restore
if restore is True:
self.restore(restore_filename)
# else set variables from input arguments
else:
# attributes
self.parameters = parameters
self.n_parameters = len(self.parameters)
self.modes = modes
self.n_modes = len(self.modes)
self.n_hidden = n_hidden
# architecture
self.architecture = [self.n_parameters] + self.n_hidden + [self.n_modes]
self.n_layers = len(self.architecture) - 1
# input parameters mean and std
self.parameters_mean_ = parameters_mean if parameters_mean is not None else np.zeros(self.n_parameters)
self.parameters_std_ = parameters_std if parameters_std is not None else np.ones(self.n_parameters)
# (log)-spectra mean and std
self.features_mean_ = features_mean if features_mean is not None else np.zeros(self.n_modes)
self.features_std_ = features_std if features_std is not None else np.ones(self.n_modes)
# input parameters mean and std
self.parameters_mean = tf.constant(self.parameters_mean_, dtype=dtype, name='parameters_mean')
self.parameters_std = tf.constant(self.parameters_std_, dtype=dtype, name='parameters_std')
# (log)-spectra mean and std
self.features_mean = tf.constant(self.features_mean_, dtype=dtype, name='features_mean')
self.features_std = tf.constant(self.features_std_, dtype=dtype, name='features_std')
# weights, biases and activation function parameters for each layer of the network
self.W = []
self.b = []
self.alphas = []
self.betas = []
for i in range(self.n_layers):
self.W.append(tf.Variable(tf.random.normal([self.architecture[i], self.architecture[i+1]], 0., 1e-3), name="W_" + str(i), trainable=trainable))
self.b.append(tf.Variable(tf.zeros([self.architecture[i+1]]), name = "b_" + str(i), trainable=trainable))
for i in range(self.n_layers-1):
self.alphas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "alphas_" + str(i), trainable=trainable))
self.betas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "betas_" + str(i), trainable=trainable))
# restore weights if restore = True
if restore is True:
for i in range(self.n_layers):
self.W[i].assign(self.W_[i])
self.b[i].assign(self.b_[i])
for i in range(self.n_layers-1):
self.alphas[i].assign(self.alphas_[i])
self.betas[i].assign(self.betas_[i])
# optimizer
self.optimizer = optimizer
self.verbose= verbose
# print initialization info, if verbose
if self.verbose:
multiline_str = "\nInitialized cosmopower_NN model, \n" \
f"mapping {self.n_parameters} input parameters to {self.n_modes} output modes, \n" \
f"using {len(self.n_hidden)} hidden layers, \n" \
f"with {list(self.n_hidden)} nodes, respectively. \n"
print(multiline_str)
# ========== TENSORFLOW implementation ===============
# non-linear activation function
def activation(self,
x,
alpha,
beta
):
r"""
Non-linear activation function
Parameters:
x (Tensor):
linear output from previous layer
alpha (Tensor):
trainable parameter
beta (Tensor):
trainable parameter
Returns:
Tensor:
the result of applying the non-linear activation function to the linear output of the layer
"""
return tf.multiply(tf.add(beta, tf.multiply(tf.sigmoid(tf.multiply(alpha, x)), tf.subtract(1.0, beta)) ), x)
# tensor predictions
@tf.function
def predictions_tf(self,
parameters_tensor
):
r"""
Prediction given tensor of input parameters,
fully implemented in TensorFlow
Parameters:
parameters_tensor (Tensor):
input parameters
Returns:
Tensor:
output predictions
"""
outputs = []
layers = [tf.divide(tf.subtract(parameters_tensor, self.parameters_mean), self.parameters_std)]
for i in range(self.n_layers - 1):
# linear network operation
outputs.append(tf.add(tf.matmul(layers[-1], self.W[i]), self.b[i]))
# non-linear activation function
layers.append(self.activation(outputs[-1], self.alphas[i], self.betas[i]))
# linear output layer
layers.append(tf.add(tf.matmul(layers[-1], self.W[-1]), self.b[-1]))
# rescale -> output predictions
return tf.add(tf.multiply(layers[-1], self.features_std), self.features_mean)
# tensor 10.**predictions
@tf.function
def ten_to_predictions_tf(self,
parameters_tensor
):
r"""
10^predictions given tensor of input parameters,
fully implemented in TensorFlow. It raises 10 to the output
of ``predictions_tf``
Parameters:
parameters_tensor (Tensor):
input parameters
Returns:
Tensor:
10^output predictions
"""
return tf.pow(10., self.predictions_tf(parameters_tensor))
# ============= SAVE/LOAD model =============
# save network parameters to Numpy arrays
def update_emulator_parameters(self):
r"""
Update emulator parameters before saving them
"""
# put network parameters to numpy arrays
self.W_ = [self.W[i].numpy() for i in range(self.n_layers)]
self.b_ = [self.b[i].numpy() for i in range(self.n_layers)]
self.alphas_ = [self.alphas[i].numpy() for i in range(self.n_layers-1)]
self.betas_ = [self.betas[i].numpy() for i in range(self.n_layers-1)]
# put mean and std parameters to numpy arrays
self.parameters_mean_ = self.parameters_mean.numpy()
self.parameters_std_ = self.parameters_std.numpy()
self.features_mean_ = self.features_mean.numpy()
self.features_std_ = self.features_std.numpy()
# save
def save(self,
filename
):
r"""
Save network parameters
Parameters:
filename (str):
filename tag (without suffix) where model will be saved
"""
# attributes
attributes = [self.W_,
self.b_,
self.alphas_,
self.betas_,
self.parameters_mean_,
self.parameters_std_,
self.features_mean_,
self.features_std_,
self.n_parameters,
self.parameters,
self.n_modes,
self.modes,
self.n_hidden,
self.n_layers,
self.architecture]
# save attributes to file
with open(filename + ".pkl", 'wb') as f:
pickle.dump(attributes, f)
# restore attributes
def restore(self,
filename
):
r"""
Load pre-trained model
Parameters:
filename (str):
filename tag (without suffix) where model was saved
"""
# load attributes
with open(filename + ".pkl", 'rb') as f:
self.W_, self.b_, self.alphas_, self.betas_, \
self.parameters_mean_, self.parameters_std_, \
self.features_mean_, self.features_std_, \
self.n_parameters, self.parameters, \
self.n_modes, self.modes, \
self.n_hidden, self.n_layers, self.architecture = pickle.load(f)
# ========== NUMPY implementation ===============
# auxiliary function to sort input parameters
def dict_to_ordered_arr_np(self,
input_dict,
):
r"""
Sort input parameters
Parameters:
input_dict (dict [numpy.ndarray]):
input dict of (arrays of) parameters to be sorted
Returns:
numpy.ndarray:
parameters sorted according to desired order
"""
if self.parameters is not None:
return np.stack([input_dict[k] for k in self.parameters], axis=1)
else:
return np.stack([input_dict[k] for k in input_dict], axis=1)
# forward prediction given input parameters implemented in Numpy
def forward_pass_np(self,
parameters_arr
):
r"""
Forward pass through the network to predict the output,
fully implemented in Numpy
Parameters:
parameters_arr (numpy.ndarray):
array of input parameters
Returns:
numpy.ndarray:
output predictions
"""
# forward pass through the network
act = []
layers = [(parameters_arr - self.parameters_mean_)/self.parameters_std_]
for i in range(self.n_layers-1):
# linear network operation
act.append(np.dot(layers[-1], self.W_[i]) + self.b_[i])
# pass through activation function
layers.append((self.betas_[i] + (1.-self.betas_[i])*1./(1.+np.exp(-self.alphas_[i]*act[-1])))*act[-1])
# final (linear) layer -> (standardised) predictions
layers.append(np.dot(layers[-1], self.W_[-1]) + self.b_[-1])
# rescale and output
return layers[-1]*self.features_std_ + self.features_mean_
# Numpy array predictions
def predictions_np(self,
parameters_dict
):
r"""
Predictions given input parameters collected in a dict.
Fully implemented in Numpy. Calls ``forward_pass_np``
after ordering the input parameter dict
Parameters:
parameters_dict (dict [numpy.ndarray]):
dictionary of (arrays of) parameters
Returns:
numpy.ndarray:
output predictions
"""
parameters_arr = self.dict_to_ordered_arr_np(parameters_dict)
return self.forward_pass_np(parameters_arr)
# Numpy array 10.**predictions
def ten_to_predictions_np(self,
parameters_dict
):
r"""
10^predictions given input parameters collected in a dict.
Fully implemented in Numpy. It raises 10 to the output
from ``forward_pass_np``
Parameters:
parameters_dict (dict [numpy.ndarray]):
dictionary of (arrays of) parameters
Returns:
numpy.ndarray:
10^output predictions
"""
return 10.**self.predictions_np(parameters_dict)
### Infrastructure for network training ###
@tf.function
def compute_loss(self,
training_parameters,
training_features
):
r"""
Mean squared difference
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
Tensor:
mean squared difference
"""
return tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.predictions_tf(training_parameters), training_features)))
@tf.function
def compute_loss_and_gradients(self,
training_parameters,
training_features
):
r"""
Computes mean squared difference and gradients
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
loss (Tensor):
mean squared difference
gradients (Tensor):
gradients
"""
# compute loss on the tape
with tf.GradientTape() as tape:
# loss
loss = tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.predictions_tf(training_parameters), training_features)))
# compute gradients
gradients = tape.gradient(loss, self.trainable_variables)
return loss, gradients
def training_step(self,
training_parameters,
training_features
):
r"""
Minimize loss
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
loss (Tensor):
mean squared difference
"""
# compute loss and gradients
loss, gradients = self.compute_loss_and_gradients(training_parameters, training_features)
# apply gradients
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return loss
def training_step_with_accumulated_gradients(self,
training_parameters,
training_features,
accumulation_steps=10):
r"""
Minimize loss, breaking calculation into accumulated gradients
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
accumulation_steps (int):
number of accumulated gradients
Returns:
accumulated_loss (Tensor):
mean squared difference
"""
# create dataset to do sub-calculations over
dataset = tf.data.Dataset.from_tensor_slices((training_parameters, training_features)).batch(int(training_features.shape[0]/accumulation_steps))
# initialize gradients and loss (to zero)
accumulated_gradients = [tf.Variable(tf.zeros_like(variable), trainable=False) for variable in self.trainable_variables]
accumulated_loss = tf.Variable(0., trainable=False)
# loop over sub-batches
for training_parameters_, training_features_, in dataset:
# calculate loss and gradients
loss, gradients = self.compute_loss_and_gradients(training_parameters_, training_features_)
# update the accumulated gradients and loss
for i in range(len(accumulated_gradients)):
accumulated_gradients[i].assign_add(gradients[i]*training_features_.shape[0]/training_features.shape[0])
accumulated_loss.assign_add(loss*training_features_.shape[0]/training_features.shape[0])
# apply accumulated gradients
self.optimizer.apply_gradients(zip(accumulated_gradients, self.trainable_variables))
return accumulated_loss
# ==========================================
# main TRAINING function
# ==========================================
def train(self,
training_parameters,
training_features,
filename_saved_model,
# cooling schedule
validation_split=0.1,
learning_rates=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6],
batch_sizes=[1024, 1024, 1024, 1024, 1024],
gradient_accumulation_steps = [1, 1, 1, 1, 1],
# early stopping set up
patience_values = [100, 100, 100, 100, 100],
max_epochs = [1000, 1000, 1000, 1000, 1000],
):
r"""
Train the model
Parameters:
training_parameters (dict [numpy.ndarray]):
input parameters
training_features (numpy.ndarray):
true features for training
filename_saved_model (str):
filename tag where model will be saved
validation_split (float):
percentage of training data used for validation
learning_rates (list [float]):
learning rates for each step of learning schedule
batch_sizes (list [int]):
batch sizes for each step of learning schedule
gradient_accumulation_steps (list [int]):
batches for gradient accumulations for each step of learning schedule
patience_values (list [int]):
early stopping patience for each step of learning schedule
max_epochs (list [int]):
maximum number of epochs for each step of learning schedule
"""
# check correct number of steps
assert len(learning_rates)==len(batch_sizes)\
==len(gradient_accumulation_steps)==len(patience_values)==len(max_epochs), \
'Number of learning rates, batch sizes, gradient accumulation steps, patience values and max epochs are not matching!'
# training start info, if verbose
if self.verbose:
multiline_str = "Starting cosmopower_NN training, \n" \
f"using {int(100*validation_split)} per cent of training samples for validation. \n" \
f"Performing {len(learning_rates)} learning steps, with \n" \
f"{list(learning_rates)} learning rates \n" \
f"{list(batch_sizes)} batch sizes \n" \
f"{list(gradient_accumulation_steps)} gradient accumulation steps \n" \
f"{list(patience_values)} patience values \n" \
f"{list(max_epochs)} max epochs \n"
print(multiline_str)
# from dict to array
training_parameters = self.dict_to_ordered_arr_np(training_parameters)
# parameters standardisation
self.parameters_mean = np.mean(training_parameters, axis=0)
self.parameters_std = np.std(training_parameters, axis=0)
# features standardisation
self.features_mean = np.mean(training_features, axis=0)
self.features_std = np.std(training_features, axis=0)
# input parameters mean and std
self.parameters_mean = tf.constant(self.parameters_mean, dtype=dtype, name='parameters_mean')
self.parameters_std = tf.constant(self.parameters_std, dtype=dtype, name='parameters_std')
# (log)-spectra mean and std
self.features_mean = tf.constant(self.features_mean, dtype=dtype, name='features_mean')
self.features_std = tf.constant(self.features_std, dtype=dtype, name='features_std')
# training/validation split
n_validation = int(training_parameters.shape[0] * validation_split)
n_training = training_parameters.shape[0] - n_validation
# casting
training_parameters = tf.convert_to_tensor(training_parameters, dtype=dtype)
training_features = tf.convert_to_tensor(training_features, dtype=dtype)
# train using cooling/heating schedule for lr/batch-size
for i in range(len(learning_rates)):
print('learning rate = ' + str(learning_rates[i]) + ', batch size = ' + str(batch_sizes[i]))
# set learning rate
self.optimizer.lr = learning_rates[i]
# split into validation and training sub-sets
training_selection = tf.random.shuffle([True] * n_training + [False] * n_validation)
# create iterable dataset (given batch size)
training_data = tf.data.Dataset.from_tensor_slices((training_parameters[training_selection], training_features[training_selection])).shuffle(n_training).batch(batch_sizes[i])
# set up training loss
training_loss = [np.infty]
validation_loss = [np.infty]
best_loss = np.infty
early_stopping_counter = 0
# loop over epochs
with trange(max_epochs[i]) as t:
for epoch in t:
# loop over batches
for theta, feats in training_data:
# training step: check whether to accumulate gradients or not (only worth doing this for very large batch sizes)
if gradient_accumulation_steps[i] == 1:
loss = self.training_step(theta, feats)
else:
loss = self.training_step_with_accumulated_gradients(theta, feats, accumulation_steps=gradient_accumulation_steps[i])
# compute validation loss at the end of the epoch
validation_loss.append(self.compute_loss(training_parameters[~training_selection], training_features[~training_selection]).numpy())
# update the progressbar
t.set_postfix(loss=validation_loss[-1])
# early stopping condition
if validation_loss[-1] < best_loss:
best_loss = validation_loss[-1]
early_stopping_counter = 0
else:
early_stopping_counter += 1
if early_stopping_counter >= patience_values[i]:
self.update_emulator_parameters()
self.save(filename_saved_model)
print('Validation loss = ' + str(best_loss))
print('Model saved.')
break
self.update_emulator_parameters()
self.save(filename_saved_model)
print('Reached max number of epochs. Validation loss = ' + str(best_loss))
print('Model saved.')
__init__(self, parameters=None, modes=None, parameters_mean=None, parameters_std=None, features_mean=None, features_std=None, n_hidden=[512, 512, 512], restore=False, restore_filename=None, trainable=True, optimizer=<keras.optimizer_v2.adam.Adam object at 0x7f9f759f6b10>, verbose=False)
special
¶
Constructor
Source code in cosmopower/cosmopower_NN.py
def __init__(self,
parameters=None,
modes=None,
parameters_mean=None,
parameters_std=None,
features_mean=None,
features_std=None,
n_hidden=[512,512,512],
restore=False,
restore_filename=None,
trainable=True,
optimizer=tf.keras.optimizers.Adam(),
verbose=False,
):
"""
Constructor
"""
# super
super(cosmopower_NN, self).__init__()
# restore
if restore is True:
self.restore(restore_filename)
# else set variables from input arguments
else:
# attributes
self.parameters = parameters
self.n_parameters = len(self.parameters)
self.modes = modes
self.n_modes = len(self.modes)
self.n_hidden = n_hidden
# architecture
self.architecture = [self.n_parameters] + self.n_hidden + [self.n_modes]
self.n_layers = len(self.architecture) - 1
# input parameters mean and std
self.parameters_mean_ = parameters_mean if parameters_mean is not None else np.zeros(self.n_parameters)
self.parameters_std_ = parameters_std if parameters_std is not None else np.ones(self.n_parameters)
# (log)-spectra mean and std
self.features_mean_ = features_mean if features_mean is not None else np.zeros(self.n_modes)
self.features_std_ = features_std if features_std is not None else np.ones(self.n_modes)
# input parameters mean and std
self.parameters_mean = tf.constant(self.parameters_mean_, dtype=dtype, name='parameters_mean')
self.parameters_std = tf.constant(self.parameters_std_, dtype=dtype, name='parameters_std')
# (log)-spectra mean and std
self.features_mean = tf.constant(self.features_mean_, dtype=dtype, name='features_mean')
self.features_std = tf.constant(self.features_std_, dtype=dtype, name='features_std')
# weights, biases and activation function parameters for each layer of the network
self.W = []
self.b = []
self.alphas = []
self.betas = []
for i in range(self.n_layers):
self.W.append(tf.Variable(tf.random.normal([self.architecture[i], self.architecture[i+1]], 0., 1e-3), name="W_" + str(i), trainable=trainable))
self.b.append(tf.Variable(tf.zeros([self.architecture[i+1]]), name = "b_" + str(i), trainable=trainable))
for i in range(self.n_layers-1):
self.alphas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "alphas_" + str(i), trainable=trainable))
self.betas.append(tf.Variable(tf.random.normal([self.architecture[i+1]]), name = "betas_" + str(i), trainable=trainable))
# restore weights if restore = True
if restore is True:
for i in range(self.n_layers):
self.W[i].assign(self.W_[i])
self.b[i].assign(self.b_[i])
for i in range(self.n_layers-1):
self.alphas[i].assign(self.alphas_[i])
self.betas[i].assign(self.betas_[i])
# optimizer
self.optimizer = optimizer
self.verbose= verbose
# print initialization info, if verbose
if self.verbose:
multiline_str = "\nInitialized cosmopower_NN model, \n" \
f"mapping {self.n_parameters} input parameters to {self.n_modes} output modes, \n" \
f"using {len(self.n_hidden)} hidden layers, \n" \
f"with {list(self.n_hidden)} nodes, respectively. \n"
print(multiline_str)
activation(self, x, alpha, beta)
¶
Non-linear activation function
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
Tensor |
linear output from previous layer |
required |
alpha |
Tensor |
trainable parameter |
required |
beta |
Tensor |
trainable parameter |
required |
Returns:
Type | Description |
---|---|
Tensor |
the result of applying the non-linear activation function to the linear output of the layer |
Source code in cosmopower/cosmopower_NN.py
def activation(self,
x,
alpha,
beta
):
r"""
Non-linear activation function
Parameters:
x (Tensor):
linear output from previous layer
alpha (Tensor):
trainable parameter
beta (Tensor):
trainable parameter
Returns:
Tensor:
the result of applying the non-linear activation function to the linear output of the layer
"""
return tf.multiply(tf.add(beta, tf.multiply(tf.sigmoid(tf.multiply(alpha, x)), tf.subtract(1.0, beta)) ), x)
compute_loss(self, training_parameters, training_features)
¶
Mean squared difference
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_parameters |
Tensor |
input parameters |
required |
training_features |
Tensor |
true features |
required |
Returns:
Type | Description |
---|---|
Tensor |
mean squared difference |
Source code in cosmopower/cosmopower_NN.py
@tf.function
def compute_loss(self,
training_parameters,
training_features
):
r"""
Mean squared difference
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
Tensor:
mean squared difference
"""
return tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.predictions_tf(training_parameters), training_features)))
compute_loss_and_gradients(self, training_parameters, training_features)
¶
Computes mean squared difference and gradients
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_parameters |
Tensor |
input parameters |
required |
training_features |
Tensor |
true features |
required |
Returns:
Type | Description |
---|---|
loss (Tensor) |
mean squared difference gradients (Tensor): gradients |
Source code in cosmopower/cosmopower_NN.py
@tf.function
def compute_loss_and_gradients(self,
training_parameters,
training_features
):
r"""
Computes mean squared difference and gradients
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
loss (Tensor):
mean squared difference
gradients (Tensor):
gradients
"""
# compute loss on the tape
with tf.GradientTape() as tape:
# loss
loss = tf.sqrt(tf.reduce_mean(tf.math.squared_difference(self.predictions_tf(training_parameters), training_features)))
# compute gradients
gradients = tape.gradient(loss, self.trainable_variables)
return loss, gradients
dict_to_ordered_arr_np(self, input_dict)
¶
Sort input parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_dict |
dict [numpy.ndarray] |
input dict of (arrays of) parameters to be sorted |
required |
Returns:
Type | Description |
---|---|
numpy.ndarray |
parameters sorted according to desired order |
Source code in cosmopower/cosmopower_NN.py
def dict_to_ordered_arr_np(self,
input_dict,
):
r"""
Sort input parameters
Parameters:
input_dict (dict [numpy.ndarray]):
input dict of (arrays of) parameters to be sorted
Returns:
numpy.ndarray:
parameters sorted according to desired order
"""
if self.parameters is not None:
return np.stack([input_dict[k] for k in self.parameters], axis=1)
else:
return np.stack([input_dict[k] for k in input_dict], axis=1)
forward_pass_np(self, parameters_arr)
¶
Forward pass through the network to predict the output, fully implemented in Numpy
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameters_arr |
numpy.ndarray |
array of input parameters |
required |
Returns:
Type | Description |
---|---|
numpy.ndarray |
output predictions |
Source code in cosmopower/cosmopower_NN.py
def forward_pass_np(self,
parameters_arr
):
r"""
Forward pass through the network to predict the output,
fully implemented in Numpy
Parameters:
parameters_arr (numpy.ndarray):
array of input parameters
Returns:
numpy.ndarray:
output predictions
"""
# forward pass through the network
act = []
layers = [(parameters_arr - self.parameters_mean_)/self.parameters_std_]
for i in range(self.n_layers-1):
# linear network operation
act.append(np.dot(layers[-1], self.W_[i]) + self.b_[i])
# pass through activation function
layers.append((self.betas_[i] + (1.-self.betas_[i])*1./(1.+np.exp(-self.alphas_[i]*act[-1])))*act[-1])
# final (linear) layer -> (standardised) predictions
layers.append(np.dot(layers[-1], self.W_[-1]) + self.b_[-1])
# rescale and output
return layers[-1]*self.features_std_ + self.features_mean_
predictions_np(self, parameters_dict)
¶
Predictions given input parameters collected in a dict.
Fully implemented in Numpy. Calls forward_pass_np
after ordering the input parameter dict
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameters_dict |
dict [numpy.ndarray] |
dictionary of (arrays of) parameters |
required |
Returns:
Type | Description |
---|---|
numpy.ndarray |
output predictions |
Source code in cosmopower/cosmopower_NN.py
def predictions_np(self,
parameters_dict
):
r"""
Predictions given input parameters collected in a dict.
Fully implemented in Numpy. Calls ``forward_pass_np``
after ordering the input parameter dict
Parameters:
parameters_dict (dict [numpy.ndarray]):
dictionary of (arrays of) parameters
Returns:
numpy.ndarray:
output predictions
"""
parameters_arr = self.dict_to_ordered_arr_np(parameters_dict)
return self.forward_pass_np(parameters_arr)
predictions_tf(self, parameters_tensor)
¶
Prediction given tensor of input parameters, fully implemented in TensorFlow
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameters_tensor |
Tensor |
input parameters |
required |
Returns:
Type | Description |
---|---|
Tensor |
output predictions |
Source code in cosmopower/cosmopower_NN.py
@tf.function
def predictions_tf(self,
parameters_tensor
):
r"""
Prediction given tensor of input parameters,
fully implemented in TensorFlow
Parameters:
parameters_tensor (Tensor):
input parameters
Returns:
Tensor:
output predictions
"""
outputs = []
layers = [tf.divide(tf.subtract(parameters_tensor, self.parameters_mean), self.parameters_std)]
for i in range(self.n_layers - 1):
# linear network operation
outputs.append(tf.add(tf.matmul(layers[-1], self.W[i]), self.b[i]))
# non-linear activation function
layers.append(self.activation(outputs[-1], self.alphas[i], self.betas[i]))
# linear output layer
layers.append(tf.add(tf.matmul(layers[-1], self.W[-1]), self.b[-1]))
# rescale -> output predictions
return tf.add(tf.multiply(layers[-1], self.features_std), self.features_mean)
restore(self, filename)
¶
Load pre-trained model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename tag (without suffix) where model was saved |
required |
Source code in cosmopower/cosmopower_NN.py
def restore(self,
filename
):
r"""
Load pre-trained model
Parameters:
filename (str):
filename tag (without suffix) where model was saved
"""
# load attributes
with open(filename + ".pkl", 'rb') as f:
self.W_, self.b_, self.alphas_, self.betas_, \
self.parameters_mean_, self.parameters_std_, \
self.features_mean_, self.features_std_, \
self.n_parameters, self.parameters, \
self.n_modes, self.modes, \
self.n_hidden, self.n_layers, self.architecture = pickle.load(f)
save(self, filename)
¶
Save network parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
filename tag (without suffix) where model will be saved |
required |
Source code in cosmopower/cosmopower_NN.py
def save(self,
filename
):
r"""
Save network parameters
Parameters:
filename (str):
filename tag (without suffix) where model will be saved
"""
# attributes
attributes = [self.W_,
self.b_,
self.alphas_,
self.betas_,
self.parameters_mean_,
self.parameters_std_,
self.features_mean_,
self.features_std_,
self.n_parameters,
self.parameters,
self.n_modes,
self.modes,
self.n_hidden,
self.n_layers,
self.architecture]
# save attributes to file
with open(filename + ".pkl", 'wb') as f:
pickle.dump(attributes, f)
ten_to_predictions_np(self, parameters_dict)
¶
10^predictions given input parameters collected in a dict.
Fully implemented in Numpy. It raises 10 to the output
from forward_pass_np
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameters_dict |
dict [numpy.ndarray] |
dictionary of (arrays of) parameters |
required |
Returns:
Type | Description |
---|---|
numpy.ndarray |
10^output predictions |
Source code in cosmopower/cosmopower_NN.py
def ten_to_predictions_np(self,
parameters_dict
):
r"""
10^predictions given input parameters collected in a dict.
Fully implemented in Numpy. It raises 10 to the output
from ``forward_pass_np``
Parameters:
parameters_dict (dict [numpy.ndarray]):
dictionary of (arrays of) parameters
Returns:
numpy.ndarray:
10^output predictions
"""
return 10.**self.predictions_np(parameters_dict)
ten_to_predictions_tf(self, parameters_tensor)
¶
10^predictions given tensor of input parameters,
fully implemented in TensorFlow. It raises 10 to the output
of predictions_tf
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameters_tensor |
Tensor |
input parameters |
required |
Returns:
Type | Description |
---|---|
Tensor |
10^output predictions |
Source code in cosmopower/cosmopower_NN.py
@tf.function
def ten_to_predictions_tf(self,
parameters_tensor
):
r"""
10^predictions given tensor of input parameters,
fully implemented in TensorFlow. It raises 10 to the output
of ``predictions_tf``
Parameters:
parameters_tensor (Tensor):
input parameters
Returns:
Tensor:
10^output predictions
"""
return tf.pow(10., self.predictions_tf(parameters_tensor))
train(self, training_parameters, training_features, filename_saved_model, validation_split=0.1, learning_rates=[0.01, 0.001, 0.0001, 1e-05, 1e-06], batch_sizes=[1024, 1024, 1024, 1024, 1024], gradient_accumulation_steps=[1, 1, 1, 1, 1], patience_values=[100, 100, 100, 100, 100], max_epochs=[1000, 1000, 1000, 1000, 1000])
¶
Train the model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_parameters |
dict [numpy.ndarray] |
input parameters |
required |
training_features |
numpy.ndarray |
true features for training |
required |
filename_saved_model |
str |
filename tag where model will be saved |
required |
validation_split |
float |
percentage of training data used for validation |
0.1 |
learning_rates |
list [float] |
learning rates for each step of learning schedule |
[0.01, 0.001, 0.0001, 1e-05, 1e-06] |
batch_sizes |
list [int] |
batch sizes for each step of learning schedule |
[1024, 1024, 1024, 1024, 1024] |
gradient_accumulation_steps |
list [int] |
batches for gradient accumulations for each step of learning schedule |
[1, 1, 1, 1, 1] |
patience_values |
list [int] |
early stopping patience for each step of learning schedule |
[100, 100, 100, 100, 100] |
max_epochs |
list [int] |
maximum number of epochs for each step of learning schedule |
[1000, 1000, 1000, 1000, 1000] |
Source code in cosmopower/cosmopower_NN.py
def train(self,
training_parameters,
training_features,
filename_saved_model,
# cooling schedule
validation_split=0.1,
learning_rates=[1e-2, 1e-3, 1e-4, 1e-5, 1e-6],
batch_sizes=[1024, 1024, 1024, 1024, 1024],
gradient_accumulation_steps = [1, 1, 1, 1, 1],
# early stopping set up
patience_values = [100, 100, 100, 100, 100],
max_epochs = [1000, 1000, 1000, 1000, 1000],
):
r"""
Train the model
Parameters:
training_parameters (dict [numpy.ndarray]):
input parameters
training_features (numpy.ndarray):
true features for training
filename_saved_model (str):
filename tag where model will be saved
validation_split (float):
percentage of training data used for validation
learning_rates (list [float]):
learning rates for each step of learning schedule
batch_sizes (list [int]):
batch sizes for each step of learning schedule
gradient_accumulation_steps (list [int]):
batches for gradient accumulations for each step of learning schedule
patience_values (list [int]):
early stopping patience for each step of learning schedule
max_epochs (list [int]):
maximum number of epochs for each step of learning schedule
"""
# check correct number of steps
assert len(learning_rates)==len(batch_sizes)\
==len(gradient_accumulation_steps)==len(patience_values)==len(max_epochs), \
'Number of learning rates, batch sizes, gradient accumulation steps, patience values and max epochs are not matching!'
# training start info, if verbose
if self.verbose:
multiline_str = "Starting cosmopower_NN training, \n" \
f"using {int(100*validation_split)} per cent of training samples for validation. \n" \
f"Performing {len(learning_rates)} learning steps, with \n" \
f"{list(learning_rates)} learning rates \n" \
f"{list(batch_sizes)} batch sizes \n" \
f"{list(gradient_accumulation_steps)} gradient accumulation steps \n" \
f"{list(patience_values)} patience values \n" \
f"{list(max_epochs)} max epochs \n"
print(multiline_str)
# from dict to array
training_parameters = self.dict_to_ordered_arr_np(training_parameters)
# parameters standardisation
self.parameters_mean = np.mean(training_parameters, axis=0)
self.parameters_std = np.std(training_parameters, axis=0)
# features standardisation
self.features_mean = np.mean(training_features, axis=0)
self.features_std = np.std(training_features, axis=0)
# input parameters mean and std
self.parameters_mean = tf.constant(self.parameters_mean, dtype=dtype, name='parameters_mean')
self.parameters_std = tf.constant(self.parameters_std, dtype=dtype, name='parameters_std')
# (log)-spectra mean and std
self.features_mean = tf.constant(self.features_mean, dtype=dtype, name='features_mean')
self.features_std = tf.constant(self.features_std, dtype=dtype, name='features_std')
# training/validation split
n_validation = int(training_parameters.shape[0] * validation_split)
n_training = training_parameters.shape[0] - n_validation
# casting
training_parameters = tf.convert_to_tensor(training_parameters, dtype=dtype)
training_features = tf.convert_to_tensor(training_features, dtype=dtype)
# train using cooling/heating schedule for lr/batch-size
for i in range(len(learning_rates)):
print('learning rate = ' + str(learning_rates[i]) + ', batch size = ' + str(batch_sizes[i]))
# set learning rate
self.optimizer.lr = learning_rates[i]
# split into validation and training sub-sets
training_selection = tf.random.shuffle([True] * n_training + [False] * n_validation)
# create iterable dataset (given batch size)
training_data = tf.data.Dataset.from_tensor_slices((training_parameters[training_selection], training_features[training_selection])).shuffle(n_training).batch(batch_sizes[i])
# set up training loss
training_loss = [np.infty]
validation_loss = [np.infty]
best_loss = np.infty
early_stopping_counter = 0
# loop over epochs
with trange(max_epochs[i]) as t:
for epoch in t:
# loop over batches
for theta, feats in training_data:
# training step: check whether to accumulate gradients or not (only worth doing this for very large batch sizes)
if gradient_accumulation_steps[i] == 1:
loss = self.training_step(theta, feats)
else:
loss = self.training_step_with_accumulated_gradients(theta, feats, accumulation_steps=gradient_accumulation_steps[i])
# compute validation loss at the end of the epoch
validation_loss.append(self.compute_loss(training_parameters[~training_selection], training_features[~training_selection]).numpy())
# update the progressbar
t.set_postfix(loss=validation_loss[-1])
# early stopping condition
if validation_loss[-1] < best_loss:
best_loss = validation_loss[-1]
early_stopping_counter = 0
else:
early_stopping_counter += 1
if early_stopping_counter >= patience_values[i]:
self.update_emulator_parameters()
self.save(filename_saved_model)
print('Validation loss = ' + str(best_loss))
print('Model saved.')
break
self.update_emulator_parameters()
self.save(filename_saved_model)
print('Reached max number of epochs. Validation loss = ' + str(best_loss))
print('Model saved.')
training_step(self, training_parameters, training_features)
¶
Minimize loss
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_parameters |
Tensor |
input parameters |
required |
training_features |
Tensor |
true features |
required |
Returns:
Type | Description |
---|---|
loss (Tensor) |
mean squared difference |
Source code in cosmopower/cosmopower_NN.py
def training_step(self,
training_parameters,
training_features
):
r"""
Minimize loss
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
Returns:
loss (Tensor):
mean squared difference
"""
# compute loss and gradients
loss, gradients = self.compute_loss_and_gradients(training_parameters, training_features)
# apply gradients
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return loss
training_step_with_accumulated_gradients(self, training_parameters, training_features, accumulation_steps=10)
¶
Minimize loss, breaking calculation into accumulated gradients
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_parameters |
Tensor |
input parameters |
required |
training_features |
Tensor |
true features |
required |
accumulation_steps |
int |
number of accumulated gradients |
10 |
Returns:
Type | Description |
---|---|
accumulated_loss (Tensor) |
mean squared difference |
Source code in cosmopower/cosmopower_NN.py
def training_step_with_accumulated_gradients(self,
training_parameters,
training_features,
accumulation_steps=10):
r"""
Minimize loss, breaking calculation into accumulated gradients
Parameters:
training_parameters (Tensor):
input parameters
training_features (Tensor):
true features
accumulation_steps (int):
number of accumulated gradients
Returns:
accumulated_loss (Tensor):
mean squared difference
"""
# create dataset to do sub-calculations over
dataset = tf.data.Dataset.from_tensor_slices((training_parameters, training_features)).batch(int(training_features.shape[0]/accumulation_steps))
# initialize gradients and loss (to zero)
accumulated_gradients = [tf.Variable(tf.zeros_like(variable), trainable=False) for variable in self.trainable_variables]
accumulated_loss = tf.Variable(0., trainable=False)
# loop over sub-batches
for training_parameters_, training_features_, in dataset:
# calculate loss and gradients
loss, gradients = self.compute_loss_and_gradients(training_parameters_, training_features_)
# update the accumulated gradients and loss
for i in range(len(accumulated_gradients)):
accumulated_gradients[i].assign_add(gradients[i]*training_features_.shape[0]/training_features.shape[0])
accumulated_loss.assign_add(loss*training_features_.shape[0]/training_features.shape[0])
# apply accumulated gradients
self.optimizer.apply_gradients(zip(accumulated_gradients, self.trainable_variables))
return accumulated_loss
update_emulator_parameters(self)
¶
Update emulator parameters before saving them
Source code in cosmopower/cosmopower_NN.py
def update_emulator_parameters(self):
r"""
Update emulator parameters before saving them
"""
# put network parameters to numpy arrays
self.W_ = [self.W[i].numpy() for i in range(self.n_layers)]
self.b_ = [self.b[i].numpy() for i in range(self.n_layers)]
self.alphas_ = [self.alphas[i].numpy() for i in range(self.n_layers-1)]
self.betas_ = [self.betas[i].numpy() for i in range(self.n_layers-1)]
# put mean and std parameters to numpy arrays
self.parameters_mean_ = self.parameters_mean.numpy()
self.parameters_std_ = self.parameters_std.numpy()
self.features_mean_ = self.features_mean.numpy()
self.features_std_ = self.features_std.numpy()