Skip to content

Instantly share code, notes, and snippets.

@EderSantana
Last active August 29, 2015 13:57
Show Gist options
  • Save EderSantana/9514155 to your computer and use it in GitHub Desktop.
Save EderSantana/9514155 to your computer and use it in GitHub Desktop.
Mod for pylearn2.models.mlp.py: This makes tied biases the default for ConvRectifiedLinear layers
"""
Multilayer Perceptron
"""
__authors__ = "Ian Goodfellow"
__copyright__ = "Copyright 2012-2013, Universite de Montreal"
__credits__ = ["Ian Goodfellow", "David Warde-Farley"]
__license__ = "3-clause BSD"
__maintainer__ = "Ian Goodfellow"
import math
import sys
import warnings
import numpy as np
from theano import config
from theano.compat.python2x import OrderedDict
from theano.gof.op import get_debug_values
from theano.printing import Print
from theano.sandbox.rng_mrg import MRG_RandomStreams
import theano.tensor as T
from pylearn2.costs.mlp import Default
from pylearn2.expr.probabilistic_max_pooling import max_pool_channels
from pylearn2.linear import conv2d
from pylearn2.linear.matrixmul import MatrixMul
from pylearn2.models.model import Model
from pylearn2.monitor import get_monitor_doc
from pylearn2.expr.nnet import pseudoinverse_softmax_numpy
from pylearn2.space import CompositeSpace
from pylearn2.space import Conv2DSpace
from pylearn2.space import Space
from pylearn2.space import VectorSpace
from pylearn2.utils import function
from pylearn2.utils import py_integer_types
from pylearn2.utils import safe_union
from pylearn2.utils import safe_zip
from pylearn2.utils import sharedX
from pylearn2.utils import wraps
warnings.warn("MLP changing the recursion limit.")
# We need this to be high enough that the big theano graphs we make
# when doing max pooling via subtensors don't cause python to complain.
# python intentionally declares stack overflow well before the stack
# segment is actually exceeded. But we can't make this value too big
# either, or we'll get seg faults when the python interpreter really
# does go over the stack segment.
# IG encountered seg faults on eos3 (a machine at LISA labo) when using
# 50000 so for now it is set to 40000.
# I think the actual safe recursion limit can't be predicted in advance
# because you don't know how big of a stack frame each function will
# make, so there is not really a "correct" way to do this. Really the
# python interpreter should provide an option to raise the error
# precisely when you're going to exceed the stack segment.
sys.setrecursionlimit(40000)
class Layer(Model):
"""
Abstract class. A Layer of an MLP.
May only belong to one MLP.
Notes
-----
This is not currently a Block because as far as I know the Block interface
assumes every input is a single matrix. It doesn't support using Spaces to
work with composite inputs, stacked multichannel image inputs, etc. If the
Block interface were upgraded to be that flexible, then we could make this
a block.
"""
# When applying dropout to a layer's input, use this for masked values.
# Usually this will be 0, but certain kinds of layers may want to override
# this behaviour.
dropout_input_mask_value = 0.
def get_mlp(self):
"""
Returns the MLP that this layer belongs to, or None
if it has not been assigned to an MLP yet.
"""
if hasattr(self, 'mlp'):
return self.mlp
return None
def set_mlp(self, mlp):
"""
Assigns this layer to an MLP.
Parameters
----------
mlp : WRITEME
"""
assert self.get_mlp() is None
self.mlp = mlp
def get_monitoring_channels(self):
"""
.. todo::
WRITEME
"""
return OrderedDict()
def get_monitoring_channels_from_state(self, state, target=None):
"""
.. todo::
WRITEME
"""
return OrderedDict()
def fprop(self, state_below):
"""
Does the forward prop transformation for this layer.
state_below is a minibatch of states for the previous layer.
Parameters
----------
state_below : WRITEME
"""
raise NotImplementedError(str(type(self))+" does not implement fprop.")
def cost(self, Y, Y_hat):
"""
The cost of outputting Y_hat when the true output is Y. Y_hat is
assumed to be the output of the same layer's fprop, and the
implementation may do things like look at the ancestors of Y_hat in the
theano graph. This is useful for, e.g., computing numerically stable
log probabilities as the cost when Y_hat is the probability.
Parameters
----------
Y : WRITEME
Y_hat : WRITEME
Returns
-------
WRITEME
"""
raise NotImplementedError(str(type(self)) +
" does not implement mlp.Layer.cost.")
def cost_from_cost_matrix(self, cost_matrix):
"""
The cost final scalar cost computed from the cost matrix
Parameters
----------
cost_matrix : WRITEME
Examples
--------
>>> # C = model.cost_matrix(Y, Y_hat)
>>> # Do something with C like setting some values to 0
>>> # cost = model.cost_from_cost_matrix(C)
"""
raise NotImplementedError(str(type(self)) +
" does not implement "
"mlp.Layer.cost_from_cost_matrix.")
def cost_matrix(self, Y, Y_hat):
"""
The element wise cost of outputting Y_hat when the true output is Y.
Parameters
----------
Y : WRITEME
Y_hat : WRITEME
Returns
-------
WRITEME
"""
raise NotImplementedError(str(type(self)) +
" does not implement mlp.Layer.cost_matrix")
def get_weights(self):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def set_weights(self, weights):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def get_biases(self):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def set_biases(self, biases):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def get_weights_format(self):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def get_weight_decay(self, coeff):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def get_l1_weight_decay(self, coeff):
"""
.. todo::
WRITEME
"""
raise NotImplementedError
def set_input_space(self, space):
"""
.. todo::
WRITEME
Notes
-----
This usually resets parameters.
"""
raise NotImplementedError
class MLP(Layer):
"""
A multilayer perceptron.
Note that it's possible for an entire MLP to be a single layer of a larger
MLP.
"""
def __init__(self, layers, batch_size=None, input_space=None,
nvis=None, seed=None):
"""
Parameters
----------
layers : list
A list of Layer objects. The final layer specifies the output space
of this MLP.
batch_size : int, optional
If not specified then must be a positive integer. Mostly useful if
one of your layers involves a Theano op like convolution that
requires a hard-coded batch size.
nvis : int, optional
Number of "visible units" (input units). Equivalent to specifying
`input_space=VectorSpace(dim=nvis)`.
input_space : Space object, optional
A Space specifying the kind of input the MLP accepts. If None,
input space is specified by nvis.
"""
super(MLP, self).__init__()
if seed is None:
seed = [2013, 1, 4]
self.seed = seed
self.setup_rng()
assert isinstance(layers, list)
assert all(isinstance(layer, Layer) for layer in layers)
assert len(layers) >= 1
self.layer_names = set()
for layer in layers:
assert layer.get_mlp() is None
if layer.layer_name in self.layer_names:
raise ValueError("MLP.__init__ given two or more layers "
"with same name: " + layer.layer_name)
layer.set_mlp(self)
self.layer_names.add(layer.layer_name)
self.layers = layers
self.batch_size = batch_size
self.force_batch_size = batch_size
assert input_space is not None or nvis is not None
if nvis is not None:
input_space = VectorSpace(nvis)
self.input_space = input_space
self._update_layer_input_spaces()
self.freeze_set = set([])
def f(x):
if x is None:
return None
return 1. / x
def setup_rng(self):
"""
.. todo::
WRITEME
"""
self.rng = np.random.RandomState(self.seed)
@wraps(Layer.get_default_cost)
def get_default_cost(self):
return Default()
@wraps(Layer.get_output_space)
def get_output_space(self):
return self.layers[-1].get_output_space()
def _update_layer_input_spaces(self):
"""
Tells each layer what its input space should be.
Notes
-----
This usually resets the layer's parameters!
"""
layers = self.layers
try:
layers[0].set_input_space(self.input_space)
except BadInputSpaceError, e:
raise TypeError("Layer 0 (" + str(layers[0]) + " of type " +
str(type(layers[0])) +
") does not support the MLP's "
+ "specified input space (" +
str(self.input_space) +
" of type " + str(type(self.input_space)) +
"). Original exception: " + str(e))
for i in xrange(1, len(layers)):
layers[i].set_input_space(layers[i-1].get_output_space())
def add_layers(self, layers):
"""
Add new layers on top of the existing hidden layers
Parameters
----------
layers : WRITEME
"""
existing_layers = self.layers
assert len(existing_layers) > 0
for layer in layers:
assert layer.get_mlp() is None
layer.set_mlp(self)
layer.set_input_space(existing_layers[-1].get_output_space())
existing_layers.append(layer)
assert layer.layer_name not in self.layer_names
self.layer_names.add(layer.layer_name)
def freeze(self, parameter_set):
"""
.. todo::
WRITEME
"""
self.freeze_set = self.freeze_set.union(parameter_set)
@wraps(Layer.get_monitoring_channels)
def get_monitoring_channels(self, data):
X, Y = data
state = X
rval = OrderedDict()
for layer in self.layers:
ch = layer.get_monitoring_channels()
for key in ch:
value = ch[key]
doc = get_monitor_doc(value)
if doc is None:
doc = str(type(layer)) + ".get_monitoring_channels did" + \
" not provide any further documentation for" + \
" this channel."
doc = 'This channel came from a layer called "' + \
layer.layer_name + '" of an MLP.\n' + doc
value.__doc__ = doc
rval[layer.layer_name+'_'+key] = value
state = layer.fprop(state)
args = [state]
if layer is self.layers[-1]:
args.append(Y)
ch = layer.get_monitoring_channels_from_state(*args)
if not isinstance(ch, OrderedDict):
raise TypeError(str((type(ch), layer.layer_name)))
for key in ch:
value = ch[key]
doc = get_monitor_doc(value)
if doc is None:
doc = str(type(layer)) + \
".get_monitoring_channels_from_state did" + \
" not provide any further documentation for" + \
" this channel."
doc = 'This channel came from a layer called "' + \
layer.layer_name + '" of an MLP.\n' + doc
value.__doc__ = doc
rval[layer.layer_name+'_'+key] = value
return rval
@wraps(Layer.get_monitoring_data_specs)
def get_monitoring_data_specs(self):
"""
Notes
-----
In this case, we want the inputs and targets.
"""
space = CompositeSpace((self.get_input_space(),
self.get_output_space()))
source = (self.get_input_source(), self.get_target_source())
return (space, source)
@wraps(Layer.get_params)
def get_params(self):
rval = []
for layer in self.layers:
for param in layer.get_params():
if param.name is None:
print type(layer)
layer_params = layer.get_params()
assert not isinstance(layer_params, set)
for param in layer_params:
if param not in rval:
rval.append(param)
rval = [elem for elem in rval if elem not in self.freeze_set]
assert all([elem.name is not None for elem in rval])
return rval
@wraps(Model.set_batch_size)
def set_batch_size(self, batch_size):
self.batch_size = batch_size
self.force_batch_size = batch_size
for layer in self.layers:
layer.set_batch_size(batch_size)
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
for layer in self.layers:
layer.censor_updates(updates)
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
rval = OrderedDict()
params = self.get_params()
for layer in self.layers:
contrib = layer.get_lr_scalers()
assert isinstance(contrib, OrderedDict)
# No two layers can contend to scale a parameter
assert not any([key in rval for key in contrib])
# Don't try to scale anything that's not a parameter
assert all([key in params for key in contrib])
rval.update(contrib)
assert all([isinstance(val, float) for val in rval.values()])
return rval
@wraps(Layer.get_weights)
def get_weights(self):
return self.layers[0].get_weights()
@wraps(Layer.get_weights_view_shape)
def get_weights_view_shape(self):
return self.layers[0].get_weights_view_shape()
@wraps(Layer.get_weights_format)
def get_weights_format(self):
return self.layers[0].get_weights_format()
@wraps(Layer.get_weights_topo)
def get_weights_topo(self):
return self.layers[0].get_weights_topo()
def dropout_fprop(self, state_below, default_input_include_prob=0.5,
input_include_probs=None, default_input_scale=2.,
input_scales=None, per_example=True):
"""
Returns the output of the MLP, when applying dropout to the input and
intermediate layers. Each input to each layer is randomly included or
excluded for each example. The probability of inclusion is independent
for each input and each example. Each layer uses
`default_input_include_prob` unless that layer's name appears as a key
in input_include_probs, in which case the input inclusion probability
is given by the corresponding value.
Each feature is also multiplied by a scale factor. The scale factor for
each layer's input scale is determined by the same scheme as the input
probabilities.
Parameters
----------
state_below : WRITEME
The input to the MLP
default_input_include_prob : WRITEME
input_include_probs : WRITEME
default_input_scale : WRITEME
input_scales : WRITEME
per_example : bool, optional
Sample a different mask value for every example in a batch.
Defaults to `True`. If `False`, sample one mask per mini-batch.
"""
warnings.warn("dropout doesn't use fixed_var_descr so it won't work "
"with algorithms that make more than one theano "
"function call per batch, such as BGD. Implementing "
"fixed_var descr could increase the memory usage "
"though.")
if input_include_probs is None:
input_include_probs = {}
if input_scales is None:
input_scales = {}
self._validate_layer_names(list(input_include_probs.keys()))
self._validate_layer_names(list(input_scales.keys()))
theano_rng = MRG_RandomStreams(max(self.rng.randint(2 ** 15), 1))
for layer in self.layers:
layer_name = layer.layer_name
if layer_name in input_include_probs:
include_prob = input_include_probs[layer_name]
else:
include_prob = default_input_include_prob
if layer_name in input_scales:
scale = input_scales[layer_name]
else:
scale = default_input_scale
state_below = self.apply_dropout(
state=state_below,
include_prob=include_prob,
theano_rng=theano_rng,
scale=scale,
mask_value=layer.dropout_input_mask_value,
input_space=layer.get_input_space(),
per_example=per_example
)
state_below = layer.fprop(state_below)
return state_below
def masked_fprop(self, state_below, mask, masked_input_layers=None,
default_input_scale=2., input_scales=None):
"""
Forward propagate through the network with a dropout mask
determined by an integer (the binary representation of
which is used to generate the mask).
Parameters
----------
state_below : tensor_like
The (symbolic) output state of the layer below.
mask : int
An integer indexing possible binary masks. It should be
< 2 ** get_total_input_dimension(masked_input_layers)
and greater than or equal to 0.
masked_input_layers : list, optional
A list of layer names to mask. If `None`, the input to all layers
(including the first hidden layer) is masked.
default_input_scale : float, optional
The amount to scale inputs in masked layers that do not appear in
`input_scales`. Defaults to 2.
input_scales : dict, optional
A dictionary mapping layer names to floating point numbers
indicating how much to scale input to a given layer.
Returns
-------
masked_output : tensor_like
The output of the forward propagation of the masked network.
"""
if input_scales is not None:
self._validate_layer_names(input_scales)
else:
input_scales = {}
if any(n not in masked_input_layers for n in input_scales):
layers = [n for n in input_scales if n not in masked_input_layers]
raise ValueError("input scales provided for layer not masked: " %
", ".join(layers))
if masked_input_layers is not None:
self._validate_layer_names(masked_input_layers)
else:
masked_input_layers = self.layer_names
num_inputs = self.get_total_input_dimension(masked_input_layers)
assert mask >= 0, "Mask must be a non-negative integer."
if mask > 0 and math.log(mask, 2) > num_inputs:
raise ValueError("mask value of %d too large; only %d "
"inputs to layers (%s)" %
(mask, num_inputs,
", ".join(masked_input_layers)))
def binary_string(x, length, dtype):
"""
Create the binary representation of an integer `x`, padded to
`length`, with dtype `dtype`.
Parameters
----------
length : WRITEME
dtype : WRITEME
Returns
-------
WRITEME
"""
s = np.empty(length, dtype=dtype)
for i in range(length - 1, -1, -1):
if x // (2 ** i) == 1:
s[i] = 1
else:
s[i] = 0
x = x % (2 ** i)
return s
remaining_mask = mask
for layer in self.layers:
if layer.layer_name in masked_input_layers:
scale = input_scales.get(layer.layer_name,
default_input_scale)
n_inputs = layer.get_input_space().get_total_dimension()
layer_dropout_mask = remaining_mask & (2 ** n_inputs - 1)
remaining_mask >>= n_inputs
mask = binary_string(layer_dropout_mask, n_inputs,
'uint8')
shape = layer.get_input_space().get_origin_batch(1).shape
s_mask = T.as_tensor_variable(mask).reshape(shape)
if layer.dropout_input_mask_value == 0:
state_below = state_below * s_mask * scale
else:
state_below = T.switch(s_mask, state_below * scale,
layer.dropout_input_mask_value)
state_below = layer.fprop(state_below)
return state_below
def _validate_layer_names(self, layers):
"""
.. todo::
WRITEME
"""
if any(layer not in self.layer_names for layer in layers):
unknown_names = [layer for layer in layers
if layer not in self.layer_names]
raise ValueError("MLP has no layer(s) named %s" %
", ".join(unknown_names))
def get_total_input_dimension(self, layers):
"""
Get the total number of inputs to the layers whose
names are listed in `layers`. Used for computing the
total number of dropout masks.
Parameters
----------
layers : WRITEME
Returns
-------
WRITEME
"""
self._validate_layer_names(layers)
total = 0
for layer in self.layers:
if layer.layer_name in layers:
total += layer.get_input_space().get_total_dimension()
return total
@wraps(Layer.fprop)
def fprop(self, state_below, return_all=False):
rval = self.layers[0].fprop(state_below)
rlist = [rval]
for layer in self.layers[1:]:
rval = layer.fprop(rval)
rlist.append(rval)
if return_all:
return rlist
return rval
def apply_dropout(self, state, include_prob, scale, theano_rng,
input_space, mask_value=0, per_example=True):
"""
WRITEME
Parameters
----------
state: WRITEME
include_prob : WRITEME
scale : WRITEME
theano_rng : WRITEME
input_space : WRITEME
mask_value : WRITEME
per_example : bool, optional
Sample a different mask value for every example in a batch.
Defaults to `True`. If `False`, sample one mask per mini-batch.
"""
if include_prob in [None, 1.0, 1]:
return state
assert scale is not None
if isinstance(state, tuple):
return tuple(self.apply_dropout(substate, include_prob,
scale, theano_rng, mask_value)
for substate in state)
# TODO: all of this assumes that if it's not a tuple, it's
# a dense tensor. It hasn't been tested with sparse types.
# A method to format the mask (or any other values) as
# the given symbolic type should be added to the Spaces
# interface.
if per_example:
mask = theano_rng.binomial(p=include_prob, size=state.shape,
dtype=state.dtype)
else:
batch = input_space.get_origin_batch(1)
mask = theano_rng.binomial(p=include_prob, size=batch.shape,
dtype=state.dtype)
rebroadcast = T.Rebroadcast(*zip(xrange(batch.ndim),
[s == 1 for s in batch.shape]))
mask = rebroadcast(mask)
if mask_value == 0:
return state * mask * scale
else:
return T.switch(mask, state * scale, mask_value)
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
return self.layers[-1].cost(Y, Y_hat)
@wraps(Layer.cost_matrix)
def cost_matrix(self, Y, Y_hat):
return self.layers[-1].cost_matrix(Y, Y_hat)
@wraps(Layer.cost_from_cost_matrix)
def cost_from_cost_matrix(self, cost_matrix):
return self.layers[-1].cost_from_cost_matrix(cost_matrix)
def cost_from_X(self, data):
"""
Computes self.cost, but takes data=(X, Y) rather than Y_hat as an
argument.
This is just a wrapper around self.cost that computes Y_hat by
calling Y_hat = self.fprop(X)
Parameters
----------
data : WRITEME
"""
self.cost_from_X_data_specs()[0].validate(data)
X, Y = data
Y_hat = self.fprop(X)
return self.cost(Y, Y_hat)
def cost_from_X_data_specs(self):
"""
Returns the data specs needed by cost_from_X.
This is useful if cost_from_X is used in a MethodCost.
"""
space = CompositeSpace((self.get_input_space(),
self.get_output_space()))
source = (self.get_input_source(), self.get_target_source())
return (space, source)
def __str__(self):
"""
Summarizes the MLP by printing the size and format of the input to all
layers. Feel free to add reasonably concise info as needed.
"""
rval = []
for layer in self.layers:
rval.append(layer.layer_name)
input_space = layer.get_input_space()
rval.append('\tInput space: ' + str(input_space))
rval.append('\tTotal input dimension: ' +
str(input_space.get_total_dimension()))
rval = '\n'.join(rval)
return rval
class Softmax(Layer):
"""
.. todo::
WRITEME
"""
def __init__(self, n_classes, layer_name, irange=None,
istdev=None,
sparse_init=None, W_lr_scale=None,
b_lr_scale=None, max_row_norm=None,
no_affine=False,
max_col_norm=None, init_bias_target_marginals=None):
"""
.. todo::
WRITEME
"""
super(Softmax, self).__init__()
if isinstance(W_lr_scale, str):
W_lr_scale = float(W_lr_scale)
self.__dict__.update(locals())
del self.self
del self.init_bias_target_marginals
assert isinstance(n_classes, py_integer_types)
self.output_space = VectorSpace(n_classes)
if not no_affine:
self.b = sharedX(np.zeros((n_classes,)), name='softmax_b')
if init_bias_target_marginals:
marginals = init_bias_target_marginals.y.mean(axis=0)
assert marginals.ndim == 1
b = pseudoinverse_softmax_numpy(marginals).astype(self.b.dtype)
assert b.ndim == 1
assert b.dtype == self.b.dtype
self.b.set_value(b)
else:
assert init_bias_target_marginals is None
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
rval = OrderedDict()
if self.W_lr_scale is not None:
assert isinstance(self.W_lr_scale, float)
rval[self.W] = self.W_lr_scale
if not hasattr(self, 'b_lr_scale'):
self.b_lr_scale = None
if self.b_lr_scale is not None:
assert isinstance(self.b_lr_scale, float)
rval[self.b] = self.b_lr_scale
return rval
@wraps(Layer.get_monitoring_channels)
def get_monitoring_channels(self):
if self.no_affine:
return OrderedDict()
W = self.W
assert W.ndim == 2
sq_W = T.sqr(W)
row_norms = T.sqrt(sq_W.sum(axis=1))
col_norms = T.sqrt(sq_W.sum(axis=0))
return OrderedDict([('row_norms_min', row_norms.min()),
('row_norms_mean', row_norms.mean()),
('row_norms_max', row_norms.max()),
('col_norms_min', col_norms.min()),
('col_norms_mean', col_norms.mean()),
('col_norms_max', col_norms.max()), ])
@wraps(Layer.get_monitoring_channels_from_state)
def get_monitoring_channels_from_state(self, state, target=None):
mx = state.max(axis=1)
rval = OrderedDict([('mean_max_class', mx.mean()),
('max_max_class', mx.max()),
('min_max_class', mx.min())])
if target is not None:
y_hat = T.argmax(state, axis=1)
y = T.argmax(target, axis=1)
misclass = T.neq(y, y_hat).mean()
misclass = T.cast(misclass, config.floatX)
rval['misclass'] = misclass
rval['nll'] = self.cost(Y_hat=state, Y=target)
return rval
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
if not isinstance(space, Space):
raise TypeError("Expected Space, got " +
str(space)+" of type "+str(type(space)))
self.input_dim = space.get_total_dimension()
self.needs_reformat = not isinstance(space, VectorSpace)
if self.no_affine:
desired_dim = self.n_classes
assert self.input_dim == desired_dim
else:
desired_dim = self.input_dim
self.desired_space = VectorSpace(desired_dim)
if not self.needs_reformat:
assert self.desired_space == self.input_space
rng = self.mlp.rng
if self.no_affine:
self._params = []
else:
if self.irange is not None:
assert self.istdev is None
assert self.sparse_init is None
W = rng.uniform(-self.irange,
self.irange,
(self.input_dim, self.n_classes))
elif self.istdev is not None:
assert self.sparse_init is None
W = rng.randn(self.input_dim, self.n_classes) * self.istdev
else:
assert self.sparse_init is not None
W = np.zeros((self.input_dim, self.n_classes))
for i in xrange(self.n_classes):
for j in xrange(self.sparse_init):
idx = rng.randint(0, self.input_dim)
while W[idx, i] != 0.:
idx = rng.randint(0, self.input_dim)
W[idx, i] = rng.randn()
self.W = sharedX(W, 'softmax_W')
self._params = [self.b, self.W]
@wraps(Layer.get_weights_topo)
def get_weights_topo(self):
if not isinstance(self.input_space, Conv2DSpace):
raise NotImplementedError()
desired = self.W.get_value().T
ipt = self.desired_space.format_as(desired, self.input_space)
rval = Conv2DSpace.convert_numpy(ipt,
self.input_space.axes,
('b', 0, 1, 'c'))
return rval
@wraps(Layer.get_weights)
def get_weights(self):
if not isinstance(self.input_space, VectorSpace):
raise NotImplementedError()
return self.W.get_value()
@wraps(Layer.set_weights)
def set_weights(self, weights):
self.W.set_value(weights)
@wraps(Layer.set_biases)
def set_biases(self, biases):
self.b.set_value(biases)
@wraps(Layer.get_biases)
def get_biases(self):
return self.b.get_value()
@wraps(Layer.get_weights_format)
def get_weights_format(self):
return ('v', 'h')
@wraps(Layer.fprop)
def fprop(self, state_below):
self.input_space.validate(state_below)
if self.needs_reformat:
state_below = self.input_space.format_as(state_below,
self.desired_space)
self.desired_space.validate(state_below)
assert state_below.ndim == 2
if not hasattr(self, 'no_affine'):
self.no_affine = False
if self.no_affine:
Z = state_below
else:
assert self.W.ndim == 2
b = self.b
Z = T.dot(state_below, self.W) + b
rval = T.nnet.softmax(Z)
for value in get_debug_values(rval):
if self.mlp.batch_size is not None:
assert value.shape[0] == self.mlp.batch_size
return rval
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
assert hasattr(Y_hat, 'owner')
owner = Y_hat.owner
assert owner is not None
op = owner.op
if isinstance(op, Print):
assert len(owner.inputs) == 1
Y_hat, = owner.inputs
owner = Y_hat.owner
op = owner.op
assert isinstance(op, T.nnet.Softmax)
z, = owner.inputs
assert z.ndim == 2
z = z - z.max(axis=1).dimshuffle(0, 'x')
log_prob = z - T.log(T.exp(z).sum(axis=1).dimshuffle(0, 'x'))
# we use sum and not mean because this is really one variable per row
log_prob_of = (Y * log_prob).sum(axis=1)
assert log_prob_of.ndim == 1
rval = log_prob_of.mean()
return - rval
@wraps(Layer.cost_matrix)
def cost_matrix(self, Y, Y_hat):
assert hasattr(Y_hat, 'owner')
owner = Y_hat.owner
assert owner is not None
op = owner.op
if isinstance(op, Print):
assert len(owner.inputs) == 1
Y_hat, = owner.inputs
owner = Y_hat.owner
op = owner.op
assert isinstance(op, T.nnet.Softmax)
z, = owner.inputs
assert z.ndim == 2
z = z - z.max(axis=1).dimshuffle(0, 'x')
log_prob = z - T.log(T.exp(z).sum(axis=1).dimshuffle(0, 'x'))
# we use sum and not mean because this is really one variable per row
log_prob_of = (Y * log_prob)
return -log_prob_of
@wraps(Layer.get_weight_decay)
def get_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
return coeff * T.sqr(self.W).sum()
@wraps(Layer.get_l1_weight_decay)
def get_l1_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W = self.W
return coeff * abs(W).sum()
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
if self.no_affine:
return
if self.max_row_norm is not None:
W = self.W
if W in updates:
updated_W = updates[W]
row_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=1))
desired_norms = T.clip(row_norms, 0, self.max_row_norm)
scales = desired_norms / (1e-7 + row_norms)
updates[W] = updated_W * scales.dimshuffle(0, 'x')
if self.max_col_norm is not None:
assert self.max_row_norm is None
W = self.W
if W in updates:
updated_W = updates[W]
col_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=0))
desired_norms = T.clip(col_norms, 0, self.max_col_norm)
updates[W] = updated_W * (desired_norms / (1e-7 + col_norms))
class SoftmaxPool(Layer):
"""
A hidden layer that uses the softmax function to do max pooling over groups
of units. When the pooling size is 1, this reduces to a standard sigmoidal
MLP layer.
"""
def __init__(self,
detector_layer_dim,
layer_name,
pool_size=1,
irange=None,
sparse_init=None,
sparse_stdev=1.,
include_prob=1.0,
init_bias=0.,
W_lr_scale=None,
b_lr_scale=None,
mask_weights=None,
max_col_norm=None):
"""
Parameters
----------
detector_layer_dim : WRITEME
layer_name : WRITEME
pool_size : WRITEME
irange : WRITEME
sparse_init : WRITEME
sparse_stdev : WRITEME
include_prob : float
Probability of including a weight element in the set of weights \
initialized to U(-irange, irange). If not included it is \
initialized to 0.
init_bias : WRITEME
W_lr_scale : WRITEME
b_lr_scale : WRITEME
mask_weights : WRITEME
max_col_norm : WRITEME
"""
super(SoftmaxPool, self).__init__()
self.__dict__.update(locals())
del self.self
self.b = sharedX(np.zeros((self.detector_layer_dim,)) + init_bias,
name=(layer_name + '_b'))
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
if not hasattr(self, 'W_lr_scale'):
self.W_lr_scale = None
if not hasattr(self, 'b_lr_scale'):
self.b_lr_scale = None
rval = OrderedDict()
if self.W_lr_scale is not None:
W, = self.transformer.get_params()
rval[W] = self.W_lr_scale
if self.b_lr_scale is not None:
rval[self.b] = self.b_lr_scale
return rval
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
if isinstance(space, VectorSpace):
self.requires_reformat = False
self.input_dim = space.dim
else:
self.requires_reformat = True
self.input_dim = space.get_total_dimension()
self.desired_space = VectorSpace(self.input_dim)
if not (self.detector_layer_dim % self.pool_size == 0):
raise ValueError("detector_layer_dim = %d, pool_size = %d. "
"Should be divisible but remainder is %d" %
(self.detector_layer_dim,
self.pool_size,
self.detector_layer_dim % self.pool_size))
self.h_space = VectorSpace(self.detector_layer_dim)
self.pool_layer_dim = self.detector_layer_dim / self.pool_size
self.output_space = VectorSpace(self.pool_layer_dim)
rng = self.mlp.rng
if self.irange is not None:
assert self.sparse_init is None
W = rng.uniform(-self.irange,
self.irange,
(self.input_dim, self.detector_layer_dim)) * \
(rng.uniform(0., 1., (self.input_dim, self.detector_layer_dim))
< self.include_prob)
else:
assert self.sparse_init is not None
W = np.zeros((self.input_dim, self.detector_layer_dim))
def mask_rejects(idx, i):
if self.mask_weights is None:
return False
return self.mask_weights[idx, i] == 0.
for i in xrange(self.detector_layer_dim):
assert self.sparse_init <= self.input_dim
for j in xrange(self.sparse_init):
idx = rng.randint(0, self.input_dim)
while W[idx, i] != 0 or mask_rejects(idx, i):
idx = rng.randint(0, self.input_dim)
W[idx, i] = rng.randn()
W *= self.sparse_stdev
W = sharedX(W)
W.name = self.layer_name + '_W'
self.transformer = MatrixMul(W)
W, = self.transformer.get_params()
assert W.name is not None
if self.mask_weights is not None:
expected_shape = (self.input_dim, self.detector_layer_dim)
if expected_shape != self.mask_weights.shape:
raise ValueError("Expected mask with shape " +
str(expected_shape) +
" but got " +
str(self.mask_weights.shape))
self.mask = sharedX(self.mask_weights)
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
# Patch old pickle files
if not hasattr(self, 'mask_weights'):
self.mask_weights = None
if self.mask_weights is not None:
W, = self.transformer.get_params()
if W in updates:
updates[W] = updates[W] * self.mask
if self.max_col_norm is not None:
W, = self.transformer.get_params()
if W in updates:
updated_W = updates[W]
col_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=0))
desired_norms = T.clip(col_norms, 0, self.max_col_norm)
updates[W] = updated_W * (desired_norms / (1e-7 + col_norms))
@wraps(Layer.get_params)
def get_params(self):
assert self.b.name is not None
W, = self.transformer.get_params()
assert W.name is not None
rval = self.transformer.get_params()
assert not isinstance(rval, set)
rval = list(rval)
assert self.b not in rval
rval.append(self.b)
return rval
@wraps(Layer.get_weight_decay)
def get_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * T.sqr(W).sum()
@wraps(Layer.get_l1_weight_decay)
def get_l1_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * abs(W).sum()
@wraps(Layer.get_weights)
def get_weights(self):
if self.requires_reformat:
# This is not really an unimplemented case.
# We actually don't know how to format the weights
# in design space. We got the data in topo space
# and we don't have access to the dataset
raise NotImplementedError()
W, = self.transformer.get_params()
return W.get_value()
@wraps(Layer.set_weights)
def set_weights(self, weights):
W, = self.transformer.get_params()
W.set_value(weights)
@wraps(Layer.set_biases)
def set_biases(self, biases):
"""
.. todo::
WRITEME
"""
self.b.set_value(biases)
@wraps(Layer.get_biases)
def get_biases(self):
return self.b.get_value()
@wraps(Layer.get_weights_format)
def get_weights_format(self):
return ('v', 'h')
@wraps(Layer.get_weights_view_shape)
def get_weights_view_shape(self):
total = self.detector_layer_dim
cols = self.pool_size
if cols == 1:
# Let the PatchViewer decide how to arrange the units
# when they're not pooled
raise NotImplementedError()
# When they are pooled, make each pooling unit have one row
rows = total / cols
return rows, cols
@wraps(Layer.get_weights_topo)
def get_weights_topo(self):
if not isinstance(self.input_space, Conv2DSpace):
raise NotImplementedError()
W, = self.transformer.get_params()
W = W.T
W = W.reshape((self.detector_layer_dim,
self.input_space.shape[0],
self.input_space.shape[1],
self.input_space.num_channels))
W = Conv2DSpace.convert(W, self.input_space.axes, ('b', 0, 1, 'c'))
return function([], W)()
@wraps(Layer.get_monitoring_channels)
def get_monitoring_channels(self):
W, = self.transformer.get_params()
assert W.ndim == 2
sq_W = T.sqr(W)
row_norms = T.sqrt(sq_W.sum(axis=1))
col_norms = T.sqrt(sq_W.sum(axis=0))
return OrderedDict([('row_norms_min', row_norms.min()),
('row_norms_mean', row_norms.mean()),
('row_norms_max', row_norms.max()),
('col_norms_min', col_norms.min()),
('col_norms_mean', col_norms.mean()),
('col_norms_max', col_norms.max()), ])
@wraps(Layer.get_monitoring_channels_from_state)
def get_monitoring_channels_from_state(self, state):
P = state
rval = OrderedDict()
if self.pool_size == 1:
vars_and_prefixes = [(P, '')]
else:
vars_and_prefixes = [(P, 'p_')]
for var, prefix in vars_and_prefixes:
v_max = var.max(axis=0)
v_min = var.min(axis=0)
v_mean = var.mean(axis=0)
v_range = v_max - v_min
# max_x.mean_u is "the mean over *u*nits of the max over
# e*x*amples" The x and u are included in the name because
# otherwise its hard to remember which axis is which when reading
# the monitor I use inner.outer rather than outer_of_inner or
# something like that because I want mean_x.* to appear next to
# each other in the alphabetical list, as these are commonly
# plotted together
for key, val in [('max_x.max_u', v_max.max()),
('max_x.mean_u', v_max.mean()),
('max_x.min_u', v_max.min()),
('min_x.max_u', v_min.max()),
('min_x.mean_u', v_min.mean()),
('min_x.min_u', v_min.min()),
('range_x.max_u', v_range.max()),
('range_x.mean_u', v_range.mean()),
('range_x.min_u', v_range.min()),
('mean_x.max_u', v_mean.max()),
('mean_x.mean_u', v_mean.mean()),
('mean_x.min_u', v_mean.min())]:
rval[prefix+key] = val
return rval
@wraps(Layer.fprop)
def fprop(self, state_below):
self.input_space.validate(state_below)
if self.requires_reformat:
state_below = self.input_space.format_as(state_below,
self.desired_space)
z = self.transformer.lmul(state_below) + self.b
if self.layer_name is not None:
z.name = self.layer_name + '_z'
p, h = max_pool_channels(z, self.pool_size)
p.name = self.layer_name + '_p_'
return p
class Linear(Layer):
"""
.. todo::
WRITEME
"""
def __init__(self,
dim,
layer_name,
irange=None,
istdev=None,
sparse_init=None,
sparse_stdev=1.,
include_prob=1.0,
init_bias=0.,
W_lr_scale=None,
b_lr_scale=None,
mask_weights=None,
max_row_norm=None,
max_col_norm=None,
min_col_norm=None,
softmax_columns=False,
copy_input=0,
use_abs_loss=False,
use_bias=True):
"""
Parameters
----------
dim : WRITEME
layer_name : WRITEME
irange : WRITEME
istdev : WRITEME
sparse_init : WRITEME
sparse_stdev : WRITEME
include_prob : float
Probability of including a weight element in the set of weights \
initialized to U(-irange, irange). If not included it is \
initialized to 0.
init_bias : WRITEME
W_lr_scale : WRITEME
b_lr_scale : WRITEME
mask_weights : WRITEME
max_row_norm : WRITEME
max_col_norm : WRITEME
min_col_norm : WRITEME
softmax_columns : WRITEME
copy_input : WRITEME
use_abs_loss : WRITEME
use_bias : WRITEME
"""
super(Linear, self).__init__()
if use_bias and init_bias is None:
init_bias = 0.
self.__dict__.update(locals())
del self.self
if use_bias:
self.b = sharedX(np.zeros((self.dim,)) + init_bias,
name=(layer_name + '_b'))
else:
assert b_lr_scale is None
init_bias is None
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
if not hasattr(self, 'W_lr_scale'):
self.W_lr_scale = None
if not hasattr(self, 'b_lr_scale'):
self.b_lr_scale = None
rval = OrderedDict()
if self.W_lr_scale is not None:
W, = self.transformer.get_params()
rval[W] = self.W_lr_scale
if self.b_lr_scale is not None:
rval[self.b] = self.b_lr_scale
return rval
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
if isinstance(space, VectorSpace):
self.requires_reformat = False
self.input_dim = space.dim
else:
self.requires_reformat = True
self.input_dim = space.get_total_dimension()
self.desired_space = VectorSpace(self.input_dim)
self.output_space = VectorSpace(self.dim +
self.copy_input * self.input_dim)
rng = self.mlp.rng
if self.irange is not None:
assert self.istdev is None
assert self.sparse_init is None
W = rng.uniform(-self.irange,
self.irange,
(self.input_dim, self.dim)) * \
(rng.uniform(0., 1., (self.input_dim, self.dim))
< self.include_prob)
elif self.istdev is not None:
assert self.sparse_init is None
W = rng.randn(self.input_dim, self.dim) * self.istdev
else:
assert self.sparse_init is not None
W = np.zeros((self.input_dim, self.dim))
def mask_rejects(idx, i):
if self.mask_weights is None:
return False
return self.mask_weights[idx, i] == 0.
for i in xrange(self.dim):
assert self.sparse_init <= self.input_dim
for j in xrange(self.sparse_init):
idx = rng.randint(0, self.input_dim)
while W[idx, i] != 0 or mask_rejects(idx, i):
idx = rng.randint(0, self.input_dim)
W[idx, i] = rng.randn()
W *= self.sparse_stdev
W = sharedX(W)
W.name = self.layer_name + '_W'
self.transformer = MatrixMul(W)
W, = self.transformer.get_params()
assert W.name is not None
if self.mask_weights is not None:
expected_shape = (self.input_dim, self.dim)
if expected_shape != self.mask_weights.shape:
raise ValueError("Expected mask with shape " +
str(expected_shape)+" but got " +
str(self.mask_weights.shape))
self.mask = sharedX(self.mask_weights)
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
if self.mask_weights is not None:
W, = self.transformer.get_params()
if W in updates:
updates[W] = updates[W] * self.mask
if self.max_row_norm is not None:
W, = self.transformer.get_params()
if W in updates:
updated_W = updates[W]
row_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=1))
desired_norms = T.clip(row_norms, 0, self.max_row_norm)
scales = desired_norms / (1e-7 + row_norms)
updates[W] = updated_W * scales.dimshuffle(0, 'x')
if self.max_col_norm is not None or self.min_col_norm is not None:
assert self.max_row_norm is None
if self.max_col_norm is not None:
max_col_norm = self.max_col_norm
if self.min_col_norm is None:
self.min_col_norm = 0
W, = self.transformer.get_params()
if W in updates:
updated_W = updates[W]
col_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=0))
if self.max_col_norm is None:
max_col_norm = col_norms.max()
desired_norms = T.clip(col_norms,
self.min_col_norm,
max_col_norm)
updates[W] = updated_W * desired_norms / (1e-7 + col_norms)
@wraps(Layer.get_params)
def get_params(self):
W, = self.transformer.get_params()
assert W.name is not None
rval = self.transformer.get_params()
assert not isinstance(rval, set)
rval = list(rval)
if self.use_bias:
assert self.b.name is not None
assert self.b not in rval
rval.append(self.b)
return rval
@wraps(Layer.get_weight_decay)
def get_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * T.sqr(W).sum()
@wraps(Layer.get_l1_weight_decay)
def get_l1_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * abs(W).sum()
@wraps(Layer.get_weights)
def get_weights(self):
if self.requires_reformat:
# This is not really an unimplemented case.
# We actually don't know how to format the weights
# in design space. We got the data in topo space
# and we don't have access to the dataset
raise NotImplementedError()
W, = self.transformer.get_params()
W = W.get_value()
if self.softmax_columns:
P = np.exp(W)
Z = np.exp(W).sum(axis=0)
rval = P / Z
return rval
return W
@wraps(Layer.set_weights)
def set_weights(self, weights):
W, = self.transformer.get_params()
W.set_value(weights)
@wraps(Layer.set_biases)
def set_biases(self, biases):
self.b.set_value(biases)
@wraps(Layer.get_biases)
def get_biases(self):
"""
.. todo::
WRITEME
"""
return self.b.get_value()
@wraps(Layer.get_weights_format)
def get_weights_format(self):
return ('v', 'h')
@wraps(Layer.get_weights_topo)
def get_weights_topo(self):
if not isinstance(self.input_space, Conv2DSpace):
raise NotImplementedError()
W, = self.transformer.get_params()
W = W.T
W = W.reshape((self.dim, self.input_space.shape[0],
self.input_space.shape[1],
self.input_space.num_channels))
W = Conv2DSpace.convert(W, self.input_space.axes, ('b', 0, 1, 'c'))
return function([], W)()
@wraps(Layer.get_monitoring_channels)
def get_monitoring_channels(self):
W, = self.transformer.get_params()
assert W.ndim == 2
sq_W = T.sqr(W)
row_norms = T.sqrt(sq_W.sum(axis=1))
col_norms = T.sqrt(sq_W.sum(axis=0))
return OrderedDict([('row_norms_min', row_norms.min()),
('row_norms_mean', row_norms.mean()),
('row_norms_max', row_norms.max()),
('col_norms_min', col_norms.min()),
('col_norms_mean', col_norms.mean()),
('col_norms_max', col_norms.max()), ])
@wraps(Layer.get_monitoring_channels_from_state)
def get_monitoring_channels_from_state(self, state, target=None):
rval = OrderedDict()
mx = state.max(axis=0)
mean = state.mean(axis=0)
mn = state.min(axis=0)
rg = mx - mn
rval['range_x_max_u'] = rg.max()
rval['range_x_mean_u'] = rg.mean()
rval['range_x_min_u'] = rg.min()
rval['max_x_max_u'] = mx.max()
rval['max_x_mean_u'] = mx.mean()
rval['max_x_min_u'] = mx.min()
rval['mean_x_max_u'] = mean.max()
rval['mean_x_mean_u'] = mean.mean()
rval['mean_x_min_u'] = mean.min()
rval['min_x_max_u'] = mn.max()
rval['min_x_mean_u'] = mn.mean()
rval['min_x_min_u'] = mn.min()
return rval
def _linear_part(self, state_below):
"""
.. todo::
WRITEME
"""
# TODO: Refactor More Better(tm)
self.input_space.validate(state_below)
if self.requires_reformat:
state_below = self.input_space.format_as(state_below,
self.desired_space)
# Support old pickle files
if not hasattr(self, 'softmax_columns'):
self.softmax_columns = False
if self.softmax_columns:
W, = self.transformer.get_params()
W = W.T
W = T.nnet.softmax(W)
W = W.T
z = T.dot(state_below, W)
if self.use_bias:
z += self.b
else:
z = self.transformer.lmul(state_below)
if self.use_bias:
z += self.b
if self.layer_name is not None:
z.name = self.layer_name + '_z'
if self.copy_input:
z = T.concatenate((z, state_below), axis=1)
return z
@wraps(Layer.fprop)
def fprop(self, state_below):
# TODO: Refactor More Better(tm)
p = self._linear_part(state_below)
return p
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
return self.cost_from_cost_matrix(self.cost_matrix(Y, Y_hat))
@wraps(Layer.cost_from_cost_matrix)
def cost_from_cost_matrix(self, cost_matrix):
return cost_matrix.sum(axis=1).mean()
@wraps(Layer.cost_matrix)
def cost_matrix(self, Y, Y_hat):
if(self.use_abs_loss):
return T.abs_(Y - Y_hat)
else:
return T.sqr(Y - Y_hat)
class Tanh(Linear):
"""
A layer that performs an affine transformation of its (vectorial)
input followed by a hyperbolic tangent elementwise nonlinearity.
"""
@wraps(Layer.fprop)
def fprop(self, state_below):
p = self._linear_part(state_below)
p = T.tanh(p)
return p
@wraps(Layer.cost)
def cost(self, *args, **kwargs):
raise NotImplementedError()
class Sigmoid(Linear):
"""
A layer that performs an affine transformation of its (vectorial)
input followed by a logistic sigmoid elementwise nonlinearity.
"""
def __init__(self, monitor_style='detection', **kwargs):
"""
.. todo::
WRITEME properly
monitor_style: a string, either 'detection' or 'classification'
'detection' by default
if 'detection':
get_monitor_from_state makes no assumptions about
target, reports info about how good model is at
detecting positive bits.
This will monitor precision, recall, and F1 score
based on a detection threshold of 0.5. Note that
these quantities are computed *per-minibatch* and
averaged together. Unless your entire monitoring
dataset fits in one minibatch, this is not the same
as the true F1 score, etc., and will usually
seriously overestimate your performance.
if 'classification':
get_monitor_from_state assumes target is one-hot
class indicator, even though you're training the
model as k independent sigmoids. gives info on how
good the argmax is as a classifier
"""
super(Sigmoid, self).__init__(**kwargs)
assert monitor_style in ['classification', 'detection']
self.monitor_style = monitor_style
@wraps(Layer.fprop)
def fprop(self, state_below):
p = self._linear_part(state_below)
p = T.nnet.sigmoid(p)
return p
def kl(self, Y, Y_hat):
"""
.. todo::
WRITEME properly
Returns a batch (vector) of
mean across units of KL divergence for each example
KL(P || Q) where P is defined by Y and Q is defined by Y_hat
Currently Y must be purely binary. If it's not, you'll still
get the right gradient, but the value in the monitoring channel
will be wrong.
Y_hat must be generated by fprop, i.e., it must be a symbolic
sigmoid.
p log p - p log q + (1-p) log (1-p) - (1-p) log (1-q)
For binary p, some terms drop out:
- p log q - (1-p) log (1-q)
- p log sigmoid(z) - (1-p) log sigmoid(-z)
p softplus(-z) + (1-p) softplus(z)
"""
# Pull out the argument to the sigmoid
assert hasattr(Y_hat, 'owner')
owner = Y_hat.owner
assert owner is not None
op = owner.op
if not hasattr(op, 'scalar_op'):
raise ValueError("Expected Y_hat to be generated by an Elemwise "
"op, got "+str(op)+" of type "+str(type(op)))
assert isinstance(op.scalar_op, T.nnet.sigm.ScalarSigmoid)
z, = owner.inputs
term_1 = Y * T.nnet.softplus(-z)
term_2 = (1 - Y) * T.nnet.softplus(z)
total = term_1 + term_2
ave = total.mean(axis=1)
assert ave.ndim == 1
return ave
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
"""
.. todo::
WRITEME properly
mean across units, mean across batch of KL divergence
KL(P || Q) where P is defined by Y and Q is defined by Y_hat
Currently Y must be purely binary. If it's not, you'll still
get the right gradient, but the value in the monitoring channel
will be wrong.
Y_hat must be generated by fprop, i.e., it must be a symbolic
sigmoid.
p log p - p log q + (1-p) log (1-p) - (1-p) log (1-q)
For binary p, some terms drop out:
- p log q - (1-p) log (1-q)
- p log sigmoid(z) - (1-p) log sigmoid(-z)
p softplus(-z) + (1-p) softplus(z)
"""
total = self.kl(Y=Y, Y_hat=Y_hat)
ave = total.mean()
return ave
def get_detection_channels_from_state(self, state, target):
"""
.. todo::
WRITEME
"""
rval = OrderedDict()
y_hat = state > 0.5
y = target > 0.5
wrong_bit = T.cast(T.neq(y, y_hat), state.dtype)
rval['01_loss'] = wrong_bit.mean()
rval['kl'] = self.cost(Y_hat=state, Y=target)
y = T.cast(y, state.dtype)
y_hat = T.cast(y_hat, state.dtype)
tp = (y * y_hat).sum()
fp = ((1-y) * y_hat).sum()
precision = tp / T.maximum(1., tp + fp)
recall = tp / T.maximum(1., y.sum())
rval['precision'] = precision
rval['recall'] = recall
rval['f1'] = 2. * precision * recall / T.maximum(1, precision + recall)
tp = (y * y_hat).sum(axis=0)
fp = ((1-y) * y_hat).sum(axis=0)
precision = tp / T.maximum(1., tp + fp)
rval['per_output_precision.max'] = precision.max()
rval['per_output_precision.mean'] = precision.mean()
rval['per_output_precision.min'] = precision.min()
recall = tp / T.maximum(1., y.sum(axis=0))
rval['per_output_recall.max'] = recall.max()
rval['per_output_recall.mean'] = recall.mean()
rval['per_output_recall.min'] = recall.min()
f1 = 2. * precision * recall / T.maximum(1, precision + recall)
rval['per_output_f1.max'] = f1.max()
rval['per_output_f1.mean'] = f1.mean()
rval['per_output_f1.min'] = f1.min()
return rval
@wraps(Layer.get_monitoring_channels_from_state)
def get_monitoring_channels_from_state(self, state, target=None):
rval = super(Sigmoid, self).get_monitoring_channels_from_state(state,
target)
if target is not None:
if self.monitor_style == 'detection':
rval.update(self.get_detection_channels_from_state(state,
target))
else:
assert self.monitor_style == 'classification'
# Threshold Y_hat at 0.5.
prediction = T.gt(state, 0.5)
# If even one feature is wrong for a given training example,
# it's considered incorrect, so we max over columns.
incorrect = T.neq(target, prediction).max(axis=1)
rval['misclass'] = T.cast(incorrect, config.floatX).mean()
return rval
class RectifiedLinear(Linear):
"""
Rectified linear MLP layer (Glorot and Bengio 2011).
"""
def __init__(self, left_slope=0.0, **kwargs):
"""
.. todo::
WRITEME
"""
super(RectifiedLinear, self).__init__(**kwargs)
self.left_slope = left_slope
@wraps(Layer.fprop)
def fprop(self, state_below):
p = self._linear_part(state_below)
p = p * (p > 0.) + self.left_slope * p * (p < 0.)
return p
@wraps(Layer.cost)
def cost(self, *args, **kwargs):
raise NotImplementedError()
class Softplus(Linear):
"""
Softplus MLP layer
"""
def __init__(self, **kwargs):
"""
Initializes an MLP layer using the softplus nonlinearity
h = log(1 + exp(Wx + b))
"""
super(Softplus, self).__init__(**kwargs)
@wraps(Layer.fprop)
def fprop(self, state_below):
p = self._linear_part(state_below)
p = T.nnet.softplus(p)
return p
@wraps(Layer.cost)
def cost(self, *args, **kwargs):
raise NotImplementedError()
class SpaceConverter(Layer):
"""
.. todo::
WRITEME
"""
def __init__(self, layer_name, output_space):
"""
.. todo::
WRITEME
"""
super(SpaceConverter, self).__init__()
self.__dict__.update(locals())
del self.self
self._params = []
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
@wraps(Layer.fprop)
def fprop(self, state_below):
return self.input_space.format_as(state_below, self.output_space)
class ConvRectifiedLinear(Layer):
"""
.. todo::
WRITEME
"""
def __init__(self,
output_channels,
kernel_shape,
pool_shape,
pool_stride,
layer_name,
irange=None,
border_mode='valid',
sparse_init=None,
include_prob=1.0,
init_bias=0.,
W_lr_scale=None,
b_lr_scale=None,
left_slope=0.0,
max_kernel_norm=None,
pool_type='max',
detector_normalization=None,
output_normalization=None,
tied_b=True,
kernel_stride=(1, 1)):
"""
.. todo::
WRITEME properly
output_channels: The number of output channels the layer should have.
kernel_shape: The shape of the convolution kernel.
pool_shape: The shape of the spatial max pooling. A two-tuple of ints.
pool_stride: The stride of the spatial max pooling. Also must be
square.
layer_name: A name for this layer that will be prepended to
monitoring channels related to this layer.
irange: if specified, initializes each weight randomly in
U(-irange, irange)
border_mode: A string indicating the size of the output:
full - The output is the full discrete linear convolution of the
inputs.
valid - The output consists only of those elements that do not rely
on the zero-padding.(Default)
include_prob: probability of including a weight element in the set
of weights initialized to U(-irange, irange). If not
included it is initialized to 0.
init_bias: All biases are initialized to this number
W_lr_scale: The learning rate on the weights for this layer is
multiplied by this scaling factor
b_lr_scale: The learning rate on the biases for this layer is
multiplied by this scaling factor
left_slope: **TODO**
max_kernel_norm: If specifed, each kernel is constrained to have at
most this norm.
pool_type: The type of the pooling operation performed the the
convolution. Default pooling type is max-pooling.
detector_normalization, output_normalization:
if specified, should be a callable object. the state of the
network is optionally replaced with normalization(state) at each
of the 3 points in processing:
detector: the maxout units can be normalized prior to the
spatial pooling
output: the output of the layer, after sptial pooling, can
be normalized as well
kernel_stride: The stride of the convolution kernel. A two-tuple of
ints.
"""
super(ConvRectifiedLinear, self).__init__()
if (irange is None) and (sparse_init is None):
raise AssertionError("You should specify either irange or "
"sparse_init when calling the constructor of "
"ConvRectifiedLinear.")
elif (irange is not None) and (sparse_init is not None):
raise AssertionError("You should specify either irange or "
"sparse_init when calling the constructor of "
"ConvRectifiedLinear and not both.")
self.__dict__.update(locals())
del self.self
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
if not hasattr(self, 'W_lr_scale'):
self.W_lr_scale = None
if not hasattr(self, 'b_lr_scale'):
self.b_lr_scale = None
rval = OrderedDict()
if self.W_lr_scale is not None:
W, = self.transformer.get_params()
rval[W] = self.W_lr_scale
if self.b_lr_scale is not None:
rval[self.b] = self.b_lr_scale
return rval
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
if not isinstance(space, Conv2DSpace):
raise BadInputSpaceError("ConvRectifiedLinear.set_input_space "
"expected a Conv2DSpace, got " +
str(space) + " of type " +
str(type(space)))
rng = self.mlp.rng
if self.border_mode == 'valid':
output_shape = [(self.input_space.shape[0]-self.kernel_shape[0]) /
self.kernel_stride[0] + 1,
(self.input_space.shape[1]-self.kernel_shape[1]) /
self.kernel_stride[1] + 1]
elif self.border_mode == 'full':
output_shape = [(self.input_space.shape[0]+self.kernel_shape[0]) /
self.kernel_stride[0] - 1,
(self.input_space.shape[1]+self.kernel_shape[1]) /
self.kernel_stride[1] - 1]
self.detector_space = Conv2DSpace(shape=output_shape,
num_channels=self.output_channels,
axes=('b', 'c', 0, 1))
if self.irange is not None:
assert self.sparse_init is None
self.transformer = conv2d.make_random_conv2D(
irange=self.irange,
input_space=self.input_space,
output_space=self.detector_space,
kernel_shape=self.kernel_shape,
batch_size=self.mlp.batch_size,
subsample=self.kernel_stride,
border_mode=self.border_mode,
rng=rng)
elif self.sparse_init is not None:
self.transformer = conv2d.make_sparse_random_conv2D(
num_nonzero=self.sparse_init,
input_space=self.input_space,
output_space=self.detector_space,
kernel_shape=self.kernel_shape,
batch_size=self.mlp.batch_size,
subsample=self.kernel_stride,
border_mode=self.border_mode,
rng=rng)
W, = self.transformer.get_params()
W.name = 'W'
if self.tied_b:
self.b = sharedX(np.zeros((self.detector_space.num_channels)) +
self.init_bias)
else:
self.b = sharedX(self.detector_space.get_origin() + self.init_bias)
self.b.name = 'b'
print 'Input shape: ', self.input_space.shape
print 'Detector space: ', self.detector_space.shape
assert self.pool_type in ['max', 'mean']
dummy_batch_size = self.mlp.batch_size
if dummy_batch_size is None:
dummy_batch_size = 2
dummy_detector = sharedX(
self.detector_space.get_origin_batch(dummy_batch_size))
if self.pool_type == 'max':
dummy_p = max_pool(bc01=dummy_detector,
pool_shape=self.pool_shape,
pool_stride=self.pool_stride,
image_shape=self.detector_space.shape)
elif self.pool_type == 'mean':
dummy_p = mean_pool(bc01=dummy_detector,
pool_shape=self.pool_shape,
pool_stride=self.pool_stride,
image_shape=self.detector_space.shape)
dummy_p = dummy_p.eval()
self.output_space = Conv2DSpace(shape=[dummy_p.shape[2],
dummy_p.shape[3]],
num_channels=self.output_channels,
axes=('b', 'c', 0, 1))
print 'Output space: ', self.output_space.shape
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
"""
.. todo::
WRITEME
"""
if self.max_kernel_norm is not None:
W, = self.transformer.get_params()
if W in updates:
updated_W = updates[W]
row_norms = T.sqrt(T.sum(T.sqr(updated_W), axis=(1, 2, 3)))
desired_norms = T.clip(row_norms, 0, self.max_kernel_norm)
scales = desired_norms / (1e-7 + row_norms)
updates[W] = updated_W * scales.dimshuffle(0, 'x', 'x', 'x')
@wraps(Layer.get_params)
def get_params(self):
"""
.. todo::
WRITEME
"""
assert self.b.name is not None
W, = self.transformer.get_params()
assert W.name is not None
rval = self.transformer.get_params()
assert not isinstance(rval, set)
rval = list(rval)
assert self.b not in rval
rval.append(self.b)
return rval
@wraps(Layer.get_weight_decay)
def get_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * T.sqr(W).sum()
@wraps(Layer.get_l1_weight_decay)
def get_l1_weight_decay(self, coeff):
if isinstance(coeff, str):
coeff = float(coeff)
assert isinstance(coeff, float) or hasattr(coeff, 'dtype')
W, = self.transformer.get_params()
return coeff * abs(W).sum()
@wraps(Layer.set_weights)
def set_weights(self, weights):
W, = self.transformer.get_params()
W.set_value(weights)
@wraps(Layer.set_biases)
def set_biases(self, biases):
self.b.set_value(biases)
@wraps(Layer.get_biases)
def get_biases(self):
return self.b.get_value()
@wraps(Layer.get_weights_format)
def get_weights_format(self):
return ('v', 'h')
@wraps(Layer.get_weights_topo)
def get_weights_topo(self):
outp, inp, rows, cols = range(4)
raw = self.transformer._filters.get_value()
return np.transpose(raw, (outp, rows, cols, inp))
@wraps(Layer.get_monitoring_channels)
def get_monitoring_channels(self):
W, = self.transformer.get_params()
assert W.ndim == 4
sq_W = T.sqr(W)
row_norms = T.sqrt(sq_W.sum(axis=(1, 2, 3)))
return OrderedDict([('kernel_norms_min', row_norms.min()),
('kernel_norms_mean', row_norms.mean()),
('kernel_norms_max', row_norms.max()), ])
@wraps(Layer.fprop)
def fprop(self, state_below):
self.input_space.validate(state_below)
z = self.transformer.lmul(state_below)
if not hasattr(self, 'tied_b'):
self.tied_b = False
if self.tied_b:
b = self.b.dimshuffle('x', 0, 'x', 'x')
else:
b = self.b.dimshuffle('x', 0, 1, 2)
z = z + b
if self.layer_name is not None:
z.name = self.layer_name + '_z'
d = z * (z > 0.) + self.left_slope * z * (z < 0.)
self.detector_space.validate(d)
if not hasattr(self, 'detector_normalization'):
self.detector_normalization = None
if self.detector_normalization:
d = self.detector_normalization(d)
assert self.pool_type in ['max', 'mean']
if self.pool_type == 'max':
p = max_pool(bc01=d,
pool_shape=self.pool_shape,
pool_stride=self.pool_stride,
image_shape=self.detector_space.shape)
elif self.pool_type == 'mean':
p = mean_pool(bc01=d,
pool_shape=self.pool_shape,
pool_stride=self.pool_stride,
image_shape=self.detector_space.shape)
self.output_space.validate(p)
if not hasattr(self, 'output_normalization'):
self.output_normalization = None
if self.output_normalization:
p = self.output_normalization(p)
return p
def max_pool(bc01, pool_shape, pool_stride, image_shape):
"""
.. todo::
WRITEME properly
Theano's max pooling op only support pool_stride = pool_shape
so here we have a graph that does max pooling with strides
bc01: minibatch in format (batch size, channels, rows, cols)
pool_shape: shape of the pool region (rows, cols)
pool_stride: strides between pooling regions (row stride, col stride)
image_shape: avoid doing some of the arithmetic in theano
"""
mx = None
r, c = image_shape
pr, pc = pool_shape
rs, cs = pool_stride
assert pr <= r
assert pc <= c
# Compute index in pooled space of last needed pool
# (needed = each input pixel must appear in at least one pool)
def last_pool(im_shp, p_shp, p_strd):
rval = int(np.ceil(float(im_shp - p_shp) / p_strd))
assert p_strd * rval + p_shp >= im_shp
assert p_strd * (rval - 1) + p_shp < im_shp
return rval
# Compute starting row of the last pool
last_pool_r = last_pool(image_shape[0],
pool_shape[0],
pool_stride[0]) * pool_stride[0]
# Compute number of rows needed in image for all indexes to work out
required_r = last_pool_r + pr
last_pool_c = last_pool(image_shape[1],
pool_shape[1],
pool_stride[1]) * pool_stride[1]
required_c = last_pool_c + pc
for bc01v in get_debug_values(bc01):
assert not np.any(np.isinf(bc01v))
assert bc01v.shape[2] == image_shape[0]
assert bc01v.shape[3] == image_shape[1]
wide_infinity = T.alloc(T.constant(-np.inf, dtype=config.floatX),
bc01.shape[0],
bc01.shape[1],
required_r,
required_c)
name = bc01.name
if name is None:
name = 'anon_bc01'
bc01 = T.set_subtensor(wide_infinity[:, :, 0:r, 0:c], bc01)
bc01.name = 'infinite_padded_' + name
for row_within_pool in xrange(pool_shape[0]):
row_stop = last_pool_r + row_within_pool + 1
for col_within_pool in xrange(pool_shape[1]):
col_stop = last_pool_c + col_within_pool + 1
cur = bc01[:,
:,
row_within_pool:row_stop:rs,
col_within_pool:col_stop:cs]
cur.name = ('max_pool_cur_' + bc01.name + '_' +
str(row_within_pool) + '_' + str(col_within_pool))
if mx is None:
mx = cur
else:
mx = T.maximum(mx, cur)
mx.name = ('max_pool_mx_' + bc01.name + '_' +
str(row_within_pool) + '_' + str(col_within_pool))
mx.name = 'max_pool('+name+')'
for mxv in get_debug_values(mx):
assert not np.any(np.isnan(mxv))
assert not np.any(np.isinf(mxv))
return mx
def max_pool_c01b(c01b, pool_shape, pool_stride, image_shape):
"""
.. todo::
WRITEME properly
Like max_pool but with input using axes ('c', 0, 1, 'b')
(Alex Krizhevsky format)
"""
mx = None
r, c = image_shape
pr, pc = pool_shape
rs, cs = pool_stride
assert pr > 0
assert pc > 0
assert pr <= r
assert pc <= c
# Compute index in pooled space of last needed pool
# (needed = each input pixel must appear in at least one pool)
def last_pool(im_shp, p_shp, p_strd):
rval = int(np.ceil(float(im_shp - p_shp) / p_strd))
assert p_strd * rval + p_shp >= im_shp
assert p_strd * (rval - 1) + p_shp < im_shp
return rval
# Compute starting row of the last pool
last_pool_r = last_pool(image_shape[0],
pool_shape[0],
pool_stride[0]) * pool_stride[0]
# Compute number of rows needed in image for all indexes to work out
required_r = last_pool_r + pr
last_pool_c = last_pool(image_shape[1],
pool_shape[1],
pool_stride[1]) * pool_stride[1]
required_c = last_pool_c + pc
for c01bv in get_debug_values(c01b):
assert not np.any(np.isinf(c01bv))
assert c01bv.shape[1] == r
assert c01bv.shape[2] == c
wide_infinity = T.alloc(-np.inf,
c01b.shape[0],
required_r,
required_c,
c01b.shape[3])
name = c01b.name
if name is None:
name = 'anon_bc01'
c01b = T.set_subtensor(wide_infinity[:, 0:r, 0:c, :], c01b)
c01b.name = 'infinite_padded_' + name
for row_within_pool in xrange(pool_shape[0]):
row_stop = last_pool_r + row_within_pool + 1
for col_within_pool in xrange(pool_shape[1]):
col_stop = last_pool_c + col_within_pool + 1
cur = c01b[:,
row_within_pool:row_stop:rs,
col_within_pool:col_stop:cs,
:]
cur.name = ('max_pool_cur_' + c01b.name + '_' +
str(row_within_pool) + '_' + str(col_within_pool))
if mx is None:
mx = cur
else:
mx = T.maximum(mx, cur)
mx.name = ('max_pool_mx_' + c01b.name + '_' +
str(row_within_pool)+'_'+str(col_within_pool))
mx.name = 'max_pool('+name+')'
for mxv in get_debug_values(mx):
assert not np.any(np.isnan(mxv))
assert not np.any(np.isinf(mxv))
return mx
def mean_pool(bc01, pool_shape, pool_stride, image_shape):
"""
.. todo::
WRITEME properly
bc01: minibatch in format (batch size, channels, rows, cols)
pool_shape: shape of the pool region (rows, cols)
pool_stride: strides between pooling regions (row stride, col stride)
image_shape: avoid doing some of the arithmetic in theano
"""
mx = None
r, c = image_shape
pr, pc = pool_shape
rs, cs = pool_stride
# Compute index in pooled space of last needed pool
# (needed = each input pixel must appear in at least one pool)
def last_pool(im_shp, p_shp, p_strd):
rval = int(np.ceil(float(im_shp - p_shp) / p_strd))
assert p_strd * rval + p_shp >= im_shp
assert p_strd * (rval - 1) + p_shp < im_shp
return rval
# Compute starting row of the last pool
last_pool_r = last_pool(image_shape[0],
pool_shape[0],
pool_stride[0]) * pool_stride[0]
# Compute number of rows needed in image for all indexes to work out
required_r = last_pool_r + pr
last_pool_c = last_pool(image_shape[1],
pool_shape[1],
pool_stride[1]) * pool_stride[1]
required_c = last_pool_c + pc
for bc01v in get_debug_values(bc01):
assert not np.any(np.isinf(bc01v))
assert bc01v.shape[2] == image_shape[0]
assert bc01v.shape[3] == image_shape[1]
wide_infinity = T.alloc(-np.inf,
bc01.shape[0],
bc01.shape[1],
required_r,
required_c)
name = bc01.name
if name is None:
name = 'anon_bc01'
bc01 = T.set_subtensor(wide_infinity[:, :, 0:r, 0:c], bc01)
bc01.name = 'infinite_padded_' + name
# Create a 'mask' used to keep count of the number of elements summed for
# each position
wide_infinity_count = T.alloc(0, bc01.shape[0], bc01.shape[1], required_r,
required_c)
bc01_count = T.set_subtensor(wide_infinity_count[:, :, 0:r, 0:c], 1)
for row_within_pool in xrange(pool_shape[0]):
row_stop = last_pool_r + row_within_pool + 1
for col_within_pool in xrange(pool_shape[1]):
col_stop = last_pool_c + col_within_pool + 1
cur = bc01[:,
:,
row_within_pool:row_stop:rs,
col_within_pool:col_stop:cs]
cur.name = ('mean_pool_cur_' + bc01.name + '_' +
str(row_within_pool) + '_' + str(col_within_pool))
cur_count = bc01_count[:,
:,
row_within_pool:row_stop:rs,
col_within_pool:col_stop:cs]
if mx is None:
mx = cur
count = cur_count
else:
mx = mx + cur
count = count + cur_count
mx.name = ('mean_pool_mx_' + bc01.name + '_' +
str(row_within_pool) + '_' + str(col_within_pool))
mx /= count
mx.name = 'mean_pool('+name+')'
for mxv in get_debug_values(mx):
assert not np.any(np.isnan(mxv))
assert not np.any(np.isinf(mxv))
return mx
def WeightDecay(*args, **kwargs):
"""
.. todo::
WRITEME
"""
warnings.warn("pylearn2.models.mlp.WeightDecay has moved to "
"pylearn2.costs.mlp.WeightDecay")
from pylearn2.costs.mlp import WeightDecay as WD
return WD(*args, **kwargs)
def L1WeightDecay(*args, **kwargs):
"""
.. todo::
WRITEME
"""
warnings.warn("pylearn2.models.mlp.L1WeightDecay has moved to "
"pylearn2.costs.mlp.WeightDecay")
from pylearn2.costs.mlp import L1WeightDecay as L1WD
return L1WD(*args, **kwargs)
class LinearGaussian(Linear):
"""
A Linear layer augmented with a precision vector, for modeling
conditionally Gaussian data
"""
def __init__(self, init_beta, min_beta, max_beta, beta_lr_scale, **kwargs):
"""
.. todo::
WRITEME
"""
super(LinearGaussian, self).__init__(**kwargs)
self.__dict__.update(locals())
del self.self
del self.kwargs
@wraps(Layer.set_input_space)
def set_input_space(self, space):
super(LinearGaussian, self).set_input_space(space)
assert isinstance(self.output_space, VectorSpace)
self.beta = sharedX(self.output_space.get_origin() + self.init_beta,
'beta')
@wraps(Linear.get_monitoring_channels)
def get_monitoring_channels(self):
rval = super(LinearGaussian, self).get_monitoring_channels()
assert isinstance(rval, OrderedDict)
rval['beta_min'] = self.beta.min()
rval['beta_mean'] = self.beta.mean()
rval['beta_max'] = self.beta.max()
return rval
@wraps(Linear.get_monitoring_channels_from_state)
def get_monitoring_channels_from_state(self, state, target=None):
rval = super(LinearGaussian, self).get_monitoring_channels()
if target:
rval['mse'] = T.sqr(state - target).mean()
return rval
@wraps(Linear.cost)
def cost(self, Y, Y_hat):
return (0.5 * T.dot(T.sqr(Y-Y_hat), self.beta).mean() -
0.5 * T.log(self.beta).sum())
@wraps(Layer.censor_updates)
def censor_updates(self, updates):
super(LinearGaussian, self).censor_updates(updates)
if self.beta in updates:
updates[self.beta] = T.clip(updates[self.beta],
self.min_beta,
self.max_beta)
@wraps(Layer.get_lr_scalers)
def get_lr_scalers(self):
rval = super(LinearGaussian, self).get_lr_scalers()
if self.beta_lr_scale is not None:
rval[self.beta] = self.beta_lr_scale
return rval
@wraps(Layer.get_params)
def get_params(self):
return super(LinearGaussian, self).get_params() + [self.beta]
def beta_from_design(design, min_var=1e-6, max_var=1e6):
"""
.. todo::
WRITEME
"""
return 1. / np.clip(design.var(axis=0), min_var, max_var)
def beta_from_targets(dataset, **kwargs):
"""
.. todo::
WRITEME
"""
return beta_from_design(dataset.y, **kwargs)
def beta_from_features(dataset, **kwargs):
"""
.. todo::
WRITEME
"""
return beta_from_design(dataset.X, **kwargs)
def mean_of_targets(dataset):
"""
.. todo::
WRITEME
"""
return dataset.y.mean(axis=0)
class PretrainedLayer(Layer):
"""
A layer whose weights are initialized, and optionally fixed,
based on prior training.
"""
def __init__(self, layer_name, layer_content, freeze_params=False):
"""
.. todo::
WRITEME properly
layer_content: A Model that implements "upward_pass", such as an
RBM or an Autoencoder
freeze_params: If True, regard layer_conent's parameters as fixed
If False, they become parameters of this layer and can be
fine-tuned to optimize the MLP's cost function.
"""
super(PretrainedLayer, self).__init__()
self.__dict__.update(locals())
del self.self
@wraps(Layer.set_input_space)
def set_input_space(self, space):
assert self.get_input_space() == space
@wraps(Layer.get_params)
def get_params(self):
if self.freeze_params:
return []
return self.layer_content.get_params()
@wraps(Layer.get_input_space)
def get_input_space(self):
return self.layer_content.get_input_space()
@wraps(Layer.get_output_space)
def get_output_space(self):
return self.layer_content.get_output_space()
@wraps(Layer.fprop)
def fprop(self, state_below):
return self.layer_content.upward_pass(state_below)
class CompositeLayer(Layer):
"""
A Layer that runs several simpler layers in parallel.
"""
def __init__(self, layer_name, layers):
"""
.. todo::
WRITEME properly
layers: a list or tuple of Layers.
"""
super(CompositeLayer, self).__init__()
self.__dict__.update(locals())
del self.self
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.input_space = space
for layer in self.layers:
layer.set_input_space(space)
self.output_space = CompositeSpace(tuple(layer.get_output_space()
for layer in self.layers))
@wraps(Layer.get_params)
def get_params(self):
rval = []
for layer in self.layers:
rval = safe_union(layer.get_params(), rval)
return rval
@wraps(Layer.fprop)
def fprop(self, state_below):
return tuple(layer.fprop(state_below) for layer in self.layers)
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
return sum(layer.cost(Y_elem, Y_hat_elem)
for layer, Y_elem, Y_hat_elem in
safe_zip(self.layers, Y, Y_hat))
@wraps(Layer.set_mlp)
def set_mlp(self, mlp):
super(CompositeLayer, self).set_mlp(mlp)
for layer in self.layers:
layer.set_mlp(mlp)
class FlattenerLayer(Layer):
"""
A wrapper around a different layer that flattens
the original layer's output.
The cost works by unflattening the target and then
calling the wrapped Layer's cost.
This is mostly intended for use with CompositeLayer as the wrapped
Layer, and is mostly useful as a workaround for theano not having
a TupleVariable with which to represent a composite target.
There are obvious memory, performance, and readability issues with doing
this, so really it would be better for theano to support TupleTypes.
See pylearn2.sandbox.tuple_var and the theano-dev e-mail thread
"TupleType".
"""
def __init__(self, raw_layer):
"""
.. todo::
WRITEME
"""
super(FlattenerLayer, self).__init__()
self.__dict__.update(locals())
del self.self
self.layer_name = raw_layer.layer_name
@wraps(Layer.set_input_space)
def set_input_space(self, space):
self.raw_layer.set_input_space(space)
total_dim = self.raw_layer.get_output_space().get_total_dimension()
self.output_space = VectorSpace(total_dim)
@wraps(Layer.get_params)
def get_params(self):
return self.raw_layer.get_params()
@wraps(Layer.fprop)
def fprop(self, state_below):
raw = self.raw_layer.fprop(state_below)
return self.raw_layer.get_output_space().format_as(raw,
self.output_space)
@wraps(Layer.cost)
def cost(self, Y, Y_hat):
raw_space = self.raw_layer.get_output_space()
target_space = self.output_space
raw_Y = target_space.format_as(Y, raw_space)
if isinstance(raw_space, CompositeSpace):
# Pick apart the Join that our fprop used to make Y_hat
assert hasattr(Y_hat, 'owner')
owner = Y_hat.owner
assert owner is not None
assert str(owner.op) == 'Join'
# first input to join op is the axis
raw_Y_hat = tuple(owner.inputs[1:])
else:
# To implement this generally, we'll need to give Spaces an
# undo_format or something. You can't do it with format_as
# in the opposite direction because Layer.cost needs to be
# able to assume that Y_hat is the output of fprop
raise NotImplementedError()
raw_space.validate(raw_Y_hat)
return self.raw_layer.cost(raw_Y, raw_Y_hat)
@wraps(Layer.set_mlp)
def set_mlp(self, mlp):
super(FlattenerLayer, self).set_mlp(mlp)
self.raw_layer.set_mlp(mlp)
@wraps(Layer.get_weights)
def get_weights(self):
return self.raw_layer.get_weights()
def generate_dropout_mask(mlp, default_include_prob=0.5,
input_include_probs=None, rng=(2013, 5, 17)):
"""
Generate a dropout mask (as an integer) given inclusion
probabilities.
Parameters
----------
mlp : object
An MLP object.
default_include_prob : float, optional
The probability of including an input to a hidden \
layer, for layers not listed in `input_include_probs`. \
Default is 0.5.
input_include_probs : dict, optional
A dictionary mapping layer names to probabilities \
of input inclusion for that layer. Default is `None`, \
in which `default_include_prob` is used for all \
layers.
rng : RandomState object or seed, optional
A `numpy.random.RandomState` object or a seed used to \
create one.
Returns
-------
mask : int
An integer indexing a dropout mask for the network, \
drawn with the appropriate probability given the \
inclusion probabilities.
"""
if input_include_probs is None:
input_include_probs = {}
if not hasattr(rng, 'uniform'):
rng = np.random.RandomState(rng)
total_units = 0
mask = 0
for layer in mlp.layers:
if layer.layer_name in input_include_probs:
p = input_include_probs[layer.layer_name]
else:
p = default_include_prob
for _ in xrange(layer.get_input_space().get_total_dimension()):
mask |= int(rng.uniform() < p) << total_units
total_units += 1
return mask
def sampled_dropout_average(mlp, inputs, num_masks,
default_input_include_prob=0.5,
input_include_probs=None,
default_input_scale=2.,
input_scales=None,
rng=(2013, 05, 17),
per_example=False):
"""
Take the geometric mean over a number of randomly sampled
dropout masks for an MLP with softmax outputs.
Parameters
----------
mlp : object
An MLP object.
inputs : tensor_like
A Theano variable representing a minibatch appropriate \
for fpropping through the MLP.
num_masks : int
The number of masks to sample.
default_input_include_prob : float, optional
The probability of including an input to a hidden \
layer, for layers not listed in `input_include_probs`. \
Default is 0.5.
input_include_probs : dict, optional
A dictionary mapping layer names to probabilities \
of input inclusion for that layer. Default is `None`, \
in which `default_include_prob` is used for all
layers.
default_input_scale : float, optional
The amount to scale input in dropped out layers.
input_scales : dict, optional
A dictionary mapping layer names to constants by \
which to scale the input.
rng : RandomState object or seed, optional
A `numpy.random.RandomState` object or a seed used to \
create one.
per_example : boolean, optional
If `True`, generate a different mask for every single \
test example, so you have `num_masks` per example \
instead of `num_mask` networks total. If `False`, \
`num_masks` masks are fixed in the graph.
Returns
-------
geo_mean : tensor_like
A symbolic graph for the geometric mean prediction of \
all the networks.
"""
if input_include_probs is None:
input_include_probs = {}
if input_scales is None:
input_scales = {}
if not hasattr(rng, 'uniform'):
rng = np.random.RandomState(rng)
mlp._validate_layer_names(list(input_include_probs.keys()))
mlp._validate_layer_names(list(input_scales.keys()))
if per_example:
outputs = [mlp.dropout_fprop(inputs, default_input_include_prob,
input_include_probs,
default_input_scale,
input_scales)
for _ in xrange(num_masks)]
else:
masks = [generate_dropout_mask(mlp, default_input_include_prob,
input_include_probs, rng)
for _ in xrange(num_masks)]
outputs = [mlp.masked_fprop(inputs, mask, None,
default_input_scale, input_scales)
for mask in masks]
return geometric_mean_prediction(outputs)
def exhaustive_dropout_average(mlp, inputs, masked_input_layers=None,
default_input_scale=2., input_scales=None):
"""
Take the geometric mean over all dropout masks of an
MLP with softmax outputs.
Parameters
----------
mlp : object
An MLP object.
inputs : tensor_like
A Theano variable representing a minibatch appropriate \
for fpropping through the MLP.
masked_input_layers : list, optional
A list of layer names whose input should be masked. \
Default is all layers (including the first hidden \
layer, i.e. mask the input).
default_input_scale : float, optional
The amount to scale input in dropped out layers.
input_scales : dict, optional
A dictionary mapping layer names to constants by \
which to scale the input.
Returns
-------
geo_mean : tensor_like
A symbolic graph for the geometric mean prediction \
of all exponentially many masked subnetworks.
Notes
-----
This is obviously exponential in the size of the network,
don't do this except for tiny toy networks.
"""
if masked_input_layers is None:
masked_input_layers = mlp.layer_names
mlp._validate_layer_names(masked_input_layers)
if input_scales is None:
input_scales = {}
mlp._validate_layer_names(input_scales.keys())
if any(key not in masked_input_layers for key in input_scales):
not_in = [key for key in input_scales
if key not in mlp.layer_names]
raise ValueError(", ".join(not_in) + " in input_scales"
" but not masked")
num_inputs = mlp.get_total_input_dimension(masked_input_layers)
outputs = [mlp.masked_fprop(inputs, mask, masked_input_layers,
default_input_scale, input_scales)
for mask in xrange(2 ** num_inputs)]
return geometric_mean_prediction(outputs)
def geometric_mean_prediction(forward_props):
"""
Take the geometric mean over all dropout masks of an
MLP with softmax outputs.
Parameters
----------
forward_props : list
A list of Theano graphs corresponding to forward \
propagations through the network with different \
dropout masks.
Returns
-------
geo_mean : tensor_like
A symbolic graph for the geometric mean prediction \
of all exponentially many masked subnetworks.
Notes
-----
This is obviously exponential in the size of the network,
don't do this except for tiny toy networks.
"""
presoftmax = []
for out in forward_props:
assert isinstance(out.owner.op, T.nnet.Softmax)
assert len(out.owner.inputs) == 1
presoftmax.append(out.owner.inputs[0])
average = reduce(lambda x, y: x + y, presoftmax) / float(len(presoftmax))
return T.nnet.softmax(average)
class BadInputSpaceError(TypeError):
"""
An error raised by an MLP layer when set_input_space is given an
object that is not one of the Spaces that layer supports.
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment