Source code for bayesian_neural_network

# vim: foldmethod=marker

#  Imports {{{ #
from collections import deque
from enum import Enum
import logging
from time import time
import numpy as np
import tensorflow as tf

from tensorflow_mcmc.tests.utils.theano_bnn import (BaseModel,
                                                    zero_mean_unit_var_normalization,
                                                    zero_mean_unit_var_unnormalization)

from tensorflow_mcmc.bnn_priors import LogVariancePrior, WeightPrior

from tensorflow_mcmc.sampling.sghmc import SGHMCSampler
from tensorflow_mcmc.sampling.sgld import SGLDSampler
from tensorflow_mcmc.sampling.relativistic_sghmc import RelativisticSGHMCSampler


[docs]class SamplingMethod(Enum): """ Enumeration type for all sampling methods we support. """ SGHMC = "SGHMC" SGLD = "SGLD" # will automatically generate tests for these when running "make test" autotested = ( SGHMC, SGLD, ) @staticmethod
[docs] def is_supported(sampling_method): """ Static method that returns true if `val` is a supported sampling method (e.g. there is an entry for it in `SamplingMethod` enum). Examples ---------- Supported sampling methods give `True`: >>> SamplingMethod.is_supported(SamplingMethod.SGHMC) True Other input types give `False`: >>> SamplingMethod.is_supported(0) False >>> SamplingMethod.is_supported("test") False """ return sampling_method in SamplingMethod
@staticmethod
[docs] def get_sampler(sampling_method, **sampler_args): """TODO: Docstring for get_sampler. Parameters ---------- sampling_method : SamplingMethod Enum corresponding to sampling method to return a sampler for. **sampler_args : dict Keyword arguments that contain all input arguments to the desired the constructor of the sampler for the specified `sampling_method`. Returns ------- sampler : Subclass of `sampling.MCMCSampler` A sampler instance that implements the specified `sampling_method` and is initialized with inputs `sampler_args`. """ if sampling_method == SamplingMethod.SGHMC: sampler = SGHMCSampler( batch_generator=sampler_args["batch_generator"], seed=sampler_args["seed"], cost_fun=sampler_args["cost_fun"], params=sampler_args["params"], epsilon=sampler_args["epsilon"], mdecay=sampler_args["mdecay"], scale_grad=sampler_args["scale_grad"], session=sampler_args["session"], burn_in_steps=sampler_args["burn_in_steps"] ) elif sampling_method == SamplingMethod.SGLD: sampler = SGLDSampler( batch_generator=sampler_args["batch_generator"], seed=sampler_args["seed"], cost_fun=sampler_args["cost_fun"], params=sampler_args["params"], epsilon=sampler_args["epsilon"], scale_grad=sampler_args["scale_grad"], session=sampler_args["session"], burn_in_steps=sampler_args["burn_in_steps"] ) elif sampling_method == SamplingMethod.RelativisticSGHMC: raise NotImplementedError() else: raise ValueError() return sampler
def get_default_net(inputs, seed=None): from tensorflow.contrib.layers import variance_scaling_initializer as HeNormal fc_layer_1 = tf.layers.dense( inputs, units=50, activation=tf.tanh, kernel_initializer=HeNormal(factor=1.0, dtype=tf.float64, seed=seed), bias_initializer=tf.zeros_initializer(dtype=tf.float64), name="fc_layer_1" ) fc_layer_2 = tf.layers.dense( fc_layer_1, units=50, activation=tf.tanh, kernel_initializer=HeNormal(factor=1.0, dtype=tf.float64, seed=seed), bias_initializer=tf.zeros_initializer(dtype=tf.float64), name="fc_layer_2" ) fc_layer_3 = tf.layers.dense( fc_layer_2, units=50, activation=tf.tanh, kernel_initializer=HeNormal(factor=1.0, dtype=tf.float64, seed=seed), bias_initializer=tf.zeros_initializer(dtype=tf.float64), name="fc_layer_3" ) layer_4 = tf.layers.dense( fc_layer_3, units=1, activation=None, # linear activation kernel_initializer=HeNormal(factor=1.0, dtype=tf.float64, seed=seed), bias_initializer=tf.zeros_initializer(dtype=tf.float64), name="fc_layer_4" ) output_bias = tf.Variable( [[np.log(1e-3)]], dtype=tf.float64, name="output_bias" ) output = tf.concat( [layer_4, tf.ones_like(layer_4, dtype=tf.float64) * output_bias], axis=1, name="Network_Output" ) return output def generate_batches(X, y, x_placeholder, y_placeholder, batch_size=20, seed=None): """ Infinite generator of random minibatches for a dataset. For general reference on (infinite) generators, see: https://www.python.org/dev/peps/pep-0255/ Parameters ---------- X: np.ndarray (N, D) Training data points/features y : np.ndarray (N, 1) Training data labels x_placeholder : tensorflow.Placeholder Placeholder for batches of data from `X`. y_placeholder : tensorflow.Placeholder Placeholder for batches of data from `y`. batch_size : int, optional Number of datapoints to put into a batch. seed: int, optional Random seed to use during batch generation. Defaults to `None`. Yields ------- batch_dict: dict A dictionary that maps `x_placeholder` and `y_placeholder` to `batch_size` sized minibatches of data (numpy.ndarrays) from the dataset `X`, `y`. Examples ------- Simple batch extraction example: >>> import numpy as np >>> import tensorflow as tf >>> N, D = 100, 3 # 100 datapoints with 3 features each >>> X = np.asarray([np.random.uniform(-10, 10, D) for _ in range(N)]) >>> y = np.asarray([np.random.choice([0., 1.]) for _ in range(N)]) >>> X.shape, y.shape ((100, 3), (100,)) >>> x_placeholder, y_placeholder = tf.placeholder(dtype=tf.float64), tf.placeholder(dtype=tf.float64) >>> batch_size = 20 >>> gen = generate_batches(X, y, x_placeholder, y_placeholder, batch_size) >>> batch_dict = next(gen) # extract a batch >>> set(batch_dict.keys()) == set((x_placeholder, y_placeholder)) True >>> batch_dict[x_placeholder].shape, batch_dict[y_placeholder].shape ((20, 3), (20, 1)) Batch extraction resizes batch size if dataset is too small: >>> import numpy as np >>> import tensorflow as tf >>> N, D = 10, 3 # 10 datapoints with 3 features each >>> X = np.asarray([np.random.uniform(-10, 10, D) for _ in range(N)]) >>> y = np.asarray([np.random.choice([0., 1.]) for _ in range(N)]) >>> X.shape, y.shape ((10, 3), (10,)) >>> x_placeholder, y_placeholder = tf.placeholder(dtype=tf.float64), tf.placeholder(dtype=tf.float64) >>> batch_size = 20 >>> gen = generate_batches(X, y, x_placeholder, y_placeholder, batch_size) >>> batch_dict = next(gen) # extract a batch >>> set(batch_dict.keys()) == set((x_placeholder, y_placeholder)) True >>> batch_dict[x_placeholder].shape, batch_dict[y_placeholder].shape ((10, 3), (10, 1)) In this case, the batches contain exactly all datapoints: >>> np.allclose(batch_dict[x_placeholder], X), np.allclose(batch_dict[y_placeholder].reshape(N,), y) (True, True) """ # Sanitize inputs assert(isinstance(batch_size, int)), "generate_batches: batch size must be an integer." assert(batch_size > 0), "generate_batches: batch size must be greater than zero." assert(seed is None or isinstance(seed, int)), "generate_batches: seed must be an integer or `None`" assert(y.shape[0] == X.shape[0]), "Not exactly one label per datapoint!" n_examples = X.shape[0] if seed is not None: np.random.seed(seed) # Check if we have enough data points to form a minibatch # otherwise set the batchsize equal to the number of input points initial_batch_size = batch_size # print(batch_size) batch_size = min(initial_batch_size, n_examples) # print(batch_size) if initial_batch_size != batch_size: logging.error("Not enough datapoints to form a minibatch. " "Batchsize was set to {}".format(batch_size)) while True: # `np.random.randint` is end-exclusive => for n_examples == batch_size, start == 0 holds start = np.random.randint(0, (n_examples - batch_size + 1)) minibatch_x = X[start:start + batch_size] minibatch_y = y[start:start + batch_size, None] feed_dict = { x_placeholder: minibatch_x, y_placeholder: minibatch_y.reshape(-1, 1) } yield feed_dict # }}} Imports #
[docs]class BayesianNeuralNetwork(object):
[docs] def __init__(self, sampling_method=SamplingMethod.SGHMC, n_nets=100, learning_rate=1e-3, mdecay=5e-2, n_iters=50000, batch_size=20, burn_in_steps=1000, sample_steps=100, normalize_input=True, normalize_output=True, seed=None, get_net=get_default_net, session=None): """ Bayesian Neural Networks use Bayesian methods to estimate the posterior distribution of a neural network's weights. This allows to also predict uncertainties for test points and thus makes Bayesian Neural Networks suitable for Bayesian optimization. This module uses stochastic gradient MCMC methods to sample from the posterior distribution. See [1] for more details. [1] J. T. Springenberg, A. Klein, S. Falkner, F. Hutter Bayesian Optimization with Robust Bayesian Neural Networks. In Advances in Neural Information Processing Systems 29 (2016). Parameters ---------- sampling_method : SamplingMethod, optional Method used to sample networks for this BNN. Defaults to `SamplingMethod.SGHMC`. n_nets: int, optional Number of nets to sample during training (and use to predict). Defaults to `100`. learning_rate: float, optional Learning rate to use during sampling. Defaults to `1e-3`. mdecay: float, optional Momentum decay per time-step (parameter for SGHMCSampler). Defaults to `0.05`. n_iters: int, optional Total number of iterations of the sampler to perform. Defaults to `50000` batch_size: int, optional Number of datapoints to include in each minibatch. Defaults to `20` datapoints per minibatch. burn_in_steps: int, optional Number of burn-in steps to perform Defaults to `1000`. sample_steps: int, optional Number of sample steps to perform. Defaults to `100`. normalize_input: bool, optional Specifies whether or not input data should be normalized. Defaults to `True` normalize_output: bool, optional Specifies whether or not outputs should be normalized. Defaults to `True` seed: int, optional Random seed to use in this BNN. Defaults to `None`. get_net: callable, optional Callable that returns a network specification. Expected inputs are a `tensorflow.Placeholder` object that serves as feedable input to the network and an integer random seed. Expected return value is the networks final output. Defaults to `get_default_net`. session: tensorflow.Session, optional A `tensorflow.Session` object used to delegate computations performed in this network over to `tensorflow`. Defaults to `None` which indicates we should start a fresh `tensorflow.Session`. """ # XXX: Raise readable errors for all sanitizations when they fail # Sanitize inputs assert(isinstance(n_nets, int)) assert(isinstance(learning_rate, float)) assert(isinstance(mdecay, float)) assert(isinstance(n_iters, int)) assert(n_iters > 0) assert(isinstance(batch_size, int)) assert(batch_size > 0) assert(isinstance(burn_in_steps, int)) assert(burn_in_steps >= 0) assert(isinstance(sample_steps, int)) assert(sample_steps >= 0) assert(type(normalize_input) == bool) assert(type(normalize_output) == bool) assert(seed is None or isinstance(seed, int)) assert(hasattr(get_net, "__call__")) assert(session is None or isinstance(session, tf.Session)) if not SamplingMethod.is_supported(sampling_method): raise ValueError( "'BayesianNeuralNetwork.__init__' received unsupported input " "for parameter 'sampling_method'. Input was: {input}.\n" "Supported sampling methods are enumerated in " "'SamplingMethod' enum type.".format(input=sampling_method) ) self.sampling_method = sampling_method self.get_net = get_net self.normalize_input = normalize_input self.normalize_output = normalize_output self.n_nets = n_nets self.n_iters = n_iters self.batch_size = batch_size self.learning_rate = learning_rate self.mdecay = mdecay self.burn_in_steps = burn_in_steps self.sample_steps = sample_steps self.samples = deque(maxlen=n_nets) self.seed = seed self.session = session if not self.session: self.session = tf.Session() self.variance_prior = LogVariancePrior(mean=1e-6, var=0.01) self.weight_prior = WeightPrior()
[docs] def negative_log_likelihood(self, X, Y): """ Compute the negative log likelihood of the current network parameters with respect to inputs `X` with labels `Y`. Parameters ---------- X : tensorflow.Placeholder Placeholder for input datapoints. Y : tensorflow.Placeholder Placeholder for input labels. Returns ------- neg_log_like: tensorflow.Tensor Negative log likelihood of the current network parameters with respect to inputs `X` with labels `Y`. mse: tensorflow.Tensor Mean squared error of the current network parameters with respect to inputs `X` with labels `Y`. """ self.net_output = self.get_net(inputs=X, seed=self.seed) f_mean = tf.reshape(self.net_output[:, 0], shape=(-1, 1)) f_log_var = tf.reshape(self.net_output[:, 1], shape=(-1, 1)) f_var_inv = 1. / (tf.exp(f_log_var) + 1e-16) mse = tf.square(Y - f_mean) log_like = tf.reduce_sum( tf.reduce_sum(-mse * (0.5 * f_var_inv) - 0.5 * f_log_var, axis=1) ) # scale by batch size to make this work nicely with the updaters above log_like = log_like / tf.constant(self.batch_size, dtype=tf.float64) # scale the priors by the dataset size for the same reason n_examples = tf.constant(self.X.shape[0], tf.float64, name="n_examples") # prior for the variance log_like += self.variance_prior.log_like(f_log_var) / n_examples # prior for the weights log_like += (self.weight_prior.log_like(tf.trainable_variables()) / n_examples) return -log_like, tf.reduce_mean(mse)
@BaseModel._check_shapes_predict def train(self, X, y, *args, **kwargs): """ Train our Bayesian Neural Network using input datapoints `X` with corresponding labels `y`. Parameters ---------- X : numpy.ndarray (N, D) Input training datapoints. y : numpy.ndarray (N,) Input training labels. """ # XXX: Some changes are necessary to allow multiple successive calls # to train: Proposal = clear the whole preexisting graph? # Advantages over only setting up once is that we can use the same object # on different function successively which is a lot more flexible # XXX: We might also want to move session construction here then start_time = time() self.is_trained = False self.X, self.y = X, y if self.normalize_input: self.X, self.x_mean, self.x_std = zero_mean_unit_var_normalization(self.X) if self.normalize_output: self.y, self.y_mean, self.y_std = zero_mean_unit_var_normalization(self.y) n_datapoints, n_inputs = X.shape # set up placeholders for data minibatches self.X_Minibatch = tf.placeholder(shape=(None, n_inputs), dtype=tf.float64, name="X_Minibatch") self.Y_Minibatch = tf.placeholder(dtype=tf.float64, name="Y_Minibatch") # set up tensors for negative log likelihood and mean squared error Nll, Mse = self.negative_log_likelihood( X=self.X_Minibatch, Y=self.Y_Minibatch ) self.network_params = tf.trainable_variables() # remove any leftover samples from previous "train" calls self.samples.clear() self.sampler = SamplingMethod.get_sampler( self.sampling_method, batch_generator=generate_batches( X=self.X, x_placeholder=self.X_Minibatch, y=self.y, y_placeholder=self.Y_Minibatch, batch_size=self.batch_size, seed=self.seed ), seed=self.seed, cost_fun=lambda *_: Nll, # BNN costs do not need params as input params=self.network_params, epsilon=self.learning_rate, mdecay=self.mdecay, scale_grad=n_datapoints, session=self.session, burn_in_steps=self.burn_in_steps ) self.session.run(tf.global_variables_initializer()) logging.info("Starting sampling") def log_full_training_error(is_sampling: bool): """ Compute the error of our last sampled network parameters on the full training dataset and use `logging.info` to log it. The boolean flag `sampling` is used to determine whether we are already collecting sampled networks, in which case additional info is logged using `logging.info`. Parameters ---------- is_sampling : bool Boolean flag that specifies if we are already collecting samples or if we are still doing burn-in steps. If set to `True` we will also log the total number of samples collected thus far. """ total_nll, total_mse = self.session.run( [Nll, Mse], feed_dict={ self.X_Minibatch: self.X, self.Y_Minibatch: self.y.reshape(-1, 1) } ) t = time() - start_time if is_sampling: logging.info("Iter {:8d} : NLL = {:.4e} MSE = {:.4e} " "Time = {:5.2f}".format(i, float(total_nll), float(total_mse), t)) else: logging.info("Iter {:8d} : NLL = {:.4e} MSE = {:.4e} " "Samples = {} Time = {:5.2f}".format( i, float(total_nll), float(total_mse), len(self.samples), t)) logging_intervals = {"burn-in": 512, "sampling": self.sample_steps} for i in range(self.n_iters): # boolean that tracks whether our sampler is doing burn-in steps burning_in = i <= self.burn_in_steps _, nll_value = next(self.sampler) if burning_in and i % logging_intervals["burn-in"] == 0: log_full_training_error(is_sampling=False) if not burning_in and i % logging_intervals["sampling"] == 0: log_full_training_error(is_sampling=True) # collect sample param_values = self.session.run(self.network_params) self.samples.append(param_values) if len(self.samples) == self.n_nets: # collected enough sample networks, stop iterating break self.is_trained = True
[docs] def compute_network_output(self, params, input_data): """ Compute and return the output of the network when parameterized with `params` on `input_data`. Parameters ---------- params : list of ndarray objects List of parameter values (ndarray) for each tensorflow.Variable parameter of our network. input_data : ndarray (N, D) Input points to compute the network output for. Returns ------- network_output: ndarray (N,) Output of the network parameterized with `params` on the given `input_data` points. """ feed_dict = dict(zip(self.network_params, params)) feed_dict[self.X_Minibatch] = input_data return self.session.run(self.net_output, feed_dict=feed_dict)
@BaseModel._check_shapes_predict def predict(self, X_test, return_individual_predictions=False, *args, **kwargs): """ Returns the predictive mean and variance of the objective function at the given test points. Parameters ---------- X_test: np.ndarray (N, D) Input test datapoints. return_individual_predictions: bool If set to `True` than the individual predictions of all samples are returned. Returns ---------- mean: np.array(N,) predictive mean variance: np.array(N,) predictive variance """ if not self.is_trained: logging.error("Model is not trained!") return # Normalize input if self.normalize_input: X_, _, _ = zero_mean_unit_var_normalization( X_test, self.x_mean, self.x_std ) else: X_ = X_test f_out = [] theta_noise = [] for sample in self.samples: out = self.compute_network_output(params=sample, input_data=X_) f_out.append(out[:, 0]) theta_noise.append(np.exp(out[:, 1])) f_out = np.asarray(f_out) theta_noise = np.asarray(theta_noise) if return_individual_predictions: if self.normalize_output: f_out = zero_mean_unit_var_unnormalization( f_out, self.y_mean, self.y_std ) theta_noise *= self.y_std**2 return f_out, theta_noise m = np.mean(f_out, axis=0) # Total variance # v = np.mean(f_out ** 2 + theta_noise, axis=0) - m ** 2 v = np.mean((f_out - m) ** 2, axis=0) if self.normalize_output: m = zero_mean_unit_var_unnormalization(m, self.y_mean, self.y_std) v *= self.y_std ** 2 return m, v