Skip to content

Instantly share code, notes, and snippets.

@lhenkelm
Created February 3, 2022 13:27
Show Gist options
  • Save lhenkelm/84b09d238824d7a4037b363731f72c8b to your computer and use it in GitHub Desktop.
Save lhenkelm/84b09d238824d7a4037b363731f72c8b to your computer and use it in GitHub Desktop.
fully vectorized yield_stdev implementation
import itertools
from typing import Optional, Mapping, Any, Tuple, List
import numpy as np
import pyhf
# all the locally-defined symbols the copy-pasted versions make use of
from cabinetry.model_utils import _channel_boundary_indices, ak, log
def _prepare_yield_variations(model,parameter_variations):
assert model.batch_size == model.config.npars
assert parameter_variations.shape == (model.config.npars,)*2
# outer dim. (axis 0) of up_combined holds different variations
combined_yield_vars = pyhf.tensorlib.to_numpy(
model.expected_data(
parameter_variations,
include_auxdata=False,
),
)
# indices where to split to separate all bins into regions
region_split_indices = _channel_boundary_indices(model)
# now we have list of channels, with elements shaped like (n_pars, n_bins)
yield_vars = np.split(combined_yield_vars, region_split_indices, axis = 1)
total_channel_yields = (np.array([np.sum(chan_yields, axis=1)]).reshape(-1, 1) for chan_yields in yield_vars)
# now we have channels [axis 0], pars [axis 1], bins[axis 2]
variations_ak = ak.from_iter(
itertools.chain(
yield_vars,
total_channel_yields,
)
)
return variations_ak
def yield_stdev(
model: pyhf.pdf.Model,
parameters: np.ndarray,
uncertainty: np.ndarray,
corr_mat: np.ndarray,
model_kwargs: Optional[Mapping[str, Any]] = None,
) -> Tuple[List[List[float]], List[float]]:
if model_kwargs is None:
model_kwargs = {}
if model.batch_size != model.config.npars:
model = pyhf.Model(
model.spec,
poi_name = model.config.poi_name,
modifier_settings = model.config.modifier_settings,
batch_size = model.config.npars,
**model_kwargs,
)
pars_square = np.tile(parameters.copy().astype(float), (model.config.npars, 1))
up_variations_ak = _prepare_yield_variations(model, pars_square + np.diagflat(uncertainty))
down_variations_ak = _prepare_yield_variations(model, pars_square - np.diagflat(uncertainty))
sym_uncs = (up_variations_ak - down_variations_ak) / 2
# if there are off-diagonal correlations, compute full covariance
if np.count_nonzero(corr_mat - np.diagflat(np.ones_like(parameters))) > 0:
# possible optimizations missing here now:
# - skipping staterror-staterror combinations (orthogonal)
# - taking advantage of correlation matrix symmetry
# - (optional) skipping combinations with correlations below threshold
cov = corr_mat[np.newaxis, ... , np.newaxis] * sym_uncs[:, np.newaxis, ... ]
# channels pars x pars bins channels pars pars x bins
cov = sym_uncs[..., np.newaxis, :] * cov
# channels x pars pars bins
# cov is now channels [axis 0], pars [axis 1], pars [axis 2], bins [axis 3]
total_variance = ak.sum(ak.sum(cov, axis = 2), axis = 1)
else: # only need to sum up diagonal
total_variance = ak.sum(np.power(sym_uncs, 2), axis=1)
# now we have channels [axis 0] and bins [axis 1]
n_channels = len(model.config.channels)
# convert to standard deviations per bin and per channel
total_stdev_per_bin = np.sqrt(total_variance[:n_channels])
total_stdev_per_channel = ak.flatten(np.sqrt(total_variance[n_channels:]))
log.debug(f"total stdev is {total_stdev_per_bin}")
log.debug(f"total stdev per channel is {total_stdev_per_channel}")
# convert to lists
total_stdev_per_bin = ak.to_list(total_stdev_per_bin)
total_stdev_per_channel = ak.to_list(total_stdev_per_channel)
return total_stdev_per_bin, total_stdev_per_channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment