-
-
Save lhenkelm/84b09d238824d7a4037b363731f72c8b to your computer and use it in GitHub Desktop.
fully vectorized yield_stdev implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
from typing import Optional, Mapping, Any, Tuple, List | |
import numpy as np | |
import pyhf | |
# all the locally-defined symbols the copy-pasted versions make use of | |
from cabinetry.model_utils import _channel_boundary_indices, ak, log | |
def _prepare_yield_variations(model,parameter_variations): | |
assert model.batch_size == model.config.npars | |
assert parameter_variations.shape == (model.config.npars,)*2 | |
# outer dim. (axis 0) of up_combined holds different variations | |
combined_yield_vars = pyhf.tensorlib.to_numpy( | |
model.expected_data( | |
parameter_variations, | |
include_auxdata=False, | |
), | |
) | |
# indices where to split to separate all bins into regions | |
region_split_indices = _channel_boundary_indices(model) | |
# now we have list of channels, with elements shaped like (n_pars, n_bins) | |
yield_vars = np.split(combined_yield_vars, region_split_indices, axis = 1) | |
total_channel_yields = (np.array([np.sum(chan_yields, axis=1)]).reshape(-1, 1) for chan_yields in yield_vars) | |
# now we have channels [axis 0], pars [axis 1], bins[axis 2] | |
variations_ak = ak.from_iter( | |
itertools.chain( | |
yield_vars, | |
total_channel_yields, | |
) | |
) | |
return variations_ak | |
def yield_stdev( | |
model: pyhf.pdf.Model, | |
parameters: np.ndarray, | |
uncertainty: np.ndarray, | |
corr_mat: np.ndarray, | |
model_kwargs: Optional[Mapping[str, Any]] = None, | |
) -> Tuple[List[List[float]], List[float]]: | |
if model_kwargs is None: | |
model_kwargs = {} | |
if model.batch_size != model.config.npars: | |
model = pyhf.Model( | |
model.spec, | |
poi_name = model.config.poi_name, | |
modifier_settings = model.config.modifier_settings, | |
batch_size = model.config.npars, | |
**model_kwargs, | |
) | |
pars_square = np.tile(parameters.copy().astype(float), (model.config.npars, 1)) | |
up_variations_ak = _prepare_yield_variations(model, pars_square + np.diagflat(uncertainty)) | |
down_variations_ak = _prepare_yield_variations(model, pars_square - np.diagflat(uncertainty)) | |
sym_uncs = (up_variations_ak - down_variations_ak) / 2 | |
# if there are off-diagonal correlations, compute full covariance | |
if np.count_nonzero(corr_mat - np.diagflat(np.ones_like(parameters))) > 0: | |
# possible optimizations missing here now: | |
# - skipping staterror-staterror combinations (orthogonal) | |
# - taking advantage of correlation matrix symmetry | |
# - (optional) skipping combinations with correlations below threshold | |
cov = corr_mat[np.newaxis, ... , np.newaxis] * sym_uncs[:, np.newaxis, ... ] | |
# channels pars x pars bins channels pars pars x bins | |
cov = sym_uncs[..., np.newaxis, :] * cov | |
# channels x pars pars bins | |
# cov is now channels [axis 0], pars [axis 1], pars [axis 2], bins [axis 3] | |
total_variance = ak.sum(ak.sum(cov, axis = 2), axis = 1) | |
else: # only need to sum up diagonal | |
total_variance = ak.sum(np.power(sym_uncs, 2), axis=1) | |
# now we have channels [axis 0] and bins [axis 1] | |
n_channels = len(model.config.channels) | |
# convert to standard deviations per bin and per channel | |
total_stdev_per_bin = np.sqrt(total_variance[:n_channels]) | |
total_stdev_per_channel = ak.flatten(np.sqrt(total_variance[n_channels:])) | |
log.debug(f"total stdev is {total_stdev_per_bin}") | |
log.debug(f"total stdev per channel is {total_stdev_per_channel}") | |
# convert to lists | |
total_stdev_per_bin = ak.to_list(total_stdev_per_bin) | |
total_stdev_per_channel = ak.to_list(total_stdev_per_channel) | |
return total_stdev_per_bin, total_stdev_per_channel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment