Skip to content

Instantly share code, notes, and snippets.

@chrispahm
Created April 5, 2023 14:30
import os
import sys
import types
PINLINED_DEFAULT_PACKAGE = 'formulaic'
PINLINER_MODULE_NAME = 'pinliner_loader'
loader_version = '0.2.1'
FORCE_EXC_HOOK = None
inliner_importer_code = '''
import imp
import marshal
import os
import struct
import sys
import types
class InlinerImporter(object):
version = '%(loader_version)s'
def __init__(self, data, datafile, set_excepthook=True):
self.data = data
self.datafile = datafile
if set_excepthook:
sys.excepthook = self.excepthook
@staticmethod
def excepthook(type, value, traceback):
import traceback as tb
tb.print_exception(type, value, traceback)
def find_module(self, fullname, path):
module = fullname in self.data
if module:
return self
def get_source(self, fullname):
__, start, end, ts = self.data[fullname]
with open(self.datafile) as datafile:
datafile.seek(start)
code = datafile.read(end - start)
return code
def get_code(self, fullname, filename):
py_ts = self.data[fullname][3]
try:
with open(fullname + '.pyc', 'rb') as pyc:
pyc_magic = pyc.read(4)
pyc_ts = struct.unpack('<I', pyc.read(4))[0]
if pyc_magic == imp.get_magic() and pyc_ts == py_ts:
return marshal.load(pyc)
except:
pass
code = self.get_source(fullname)
compiled_code = compile(code, filename, 'exec')
try:
with open(fullname + '.pyc', 'wb') as pyc:
pyc.write(imp.get_magic())
pyc.write(struct.pack('<I', py_ts))
marshal.dump(compiled_code, pyc)
except:
pass
return compiled_code
def load_module(self, fullname):
# If the module it's already in there we'll reload but won't remove the
# entry if we fail
exists = fullname in sys.modules
module = types.ModuleType(fullname)
module.__loader__ = self
is_package = self.data[fullname][0]
path = fullname.replace('.', os.path.sep)
if is_package:
module.__package__ = fullname
module.__file__ = os.path.join(path, '__init__.py')
module.__path__ = [path]
else:
module.__package__ = fullname.rsplit('.', 1)[0]
module.__file__ = path + '.py'
sys.modules[fullname] = module
try:
compiled_code = self.get_code(fullname, module.__file__)
exec(compiled_code, module.__dict__)
except:
if not exists:
del sys.modules[fullname]
raise
return module
''' % {'loader_version': loader_version}
'''
from __future__ import annotations
import warnings
from collections import OrderedDict
from dataclasses import dataclass, field, replace
from typing import (
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
TYPE_CHECKING,
)
from formulaic.materializers.base import EncodedTermStructure
from formulaic.parser.types import Structured, Term
from formulaic.utils.constraints import LinearConstraintSpec, LinearConstraints
from .formula import Formula, FormulaSpec
from .materializers import FormulaMaterializer, NAAction
if TYPE_CHECKING: # pragma: no cover
from .model_matrix import ModelMatrices, ModelMatrix
# Cached property was introduced in Python 3.8 (we currently support 3.7)
try:
from functools import cached_property
except ImportError: # pragma: no cover
from cached_property import cached_property
@dataclass(frozen=True)
class ModelSpec:
"""
A container for the metadata used to generate a `ModelMatrix` instance.
This object can also be used to create a `ModelMatrix` instance that
respects the encoding choices made during the generation of this `ModelSpec`
instance.
Attributes:
Configuration:
formula: The formula for which the model matrix was (and/or will be)
generated.
materializer: The materializer used (and/or to be used) to
materialize the formula into a matrix.
ensure_full_rank: Whether to ensure that the generated matrix is
"structurally" full-rank (features are not included which are
known to violate full-rankness).
na_action: The action to be taken if NA values are found in the
data. Can be on of: "drop" (the default), "raise" or "ignore".
output: The desired output type (as interpreted by the materializer;
e.g. "pandas", "sparse", etc).
State (these attributes are only populated during materialization):
structure: The model matrix structure resulting from materialization.
transform_state: The state of any stateful transformations that took
place during factor evaluation.
encoder_state: The state of any stateful transformations that took
place during encoding.
"""
@classmethod
def from_spec(
cls,
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs],
**attrs,
) -> Union[ModelSpec, ModelSpecs]:
"""
Construct a `ModelSpec` (or `Structured[ModelSpec]`) instance for the
nominated `spec`, setting and/or overriding any `ModelSpec` attributes
present in `attrs`.
Args:
spec: The specification for which to generate a `ModelSpec`
instance or structured set of `ModelSpec` instances.
attrs: Any `ModelSpec` attributes to set and/or override on all
generated `ModelSpec` instances.
"""
from .model_matrix import ModelMatrix
def prepare_model_spec(obj):
if isinstance(obj, ModelMatrix):
obj = obj.model_spec
if isinstance(obj, ModelSpec):
return obj.update(**attrs)
formula = Formula.from_spec(obj)
if not formula._has_root or formula._has_structure:
return formula._map(prepare_model_spec, as_type=ModelSpecs)
return ModelSpec(formula=formula, **attrs)
if isinstance(spec, Formula) or not isinstance(spec, Structured):
return prepare_model_spec(spec)
return spec._map(prepare_model_spec, as_type=ModelSpecs)
# Configuration attributes
formula: Formula
materializer: Optional[str] = None
materializer_params: Optional[Dict[str, Any]] = None
ensure_full_rank: bool = True
na_action: NAAction = "drop"
output: Optional[str] = None
# State attributes
structure: Optional[List[EncodedTermStructure]] = None
transform_state: Dict = field(default_factory=dict)
encoder_state: Dict = field(default_factory=dict)
def __post_init__(self):
self.__dict__["formula"] = Formula.from_spec(self.formula)
if not self.formula._has_root or self.formula._has_structure:
raise ValueError(
"Nominated `Formula` instance has structure, which is not permitted when attaching to a `ModelSpec` instance."
)
# Materializer
if self.materializer is not None and not isinstance(self.materializer, str):
self.__dict__["materializer"] = FormulaMaterializer.for_materializer(
self.materializer
).REGISTER_NAME
self.__dict__["na_action"] = NAAction(self.na_action)
# Derived features
@cached_property
def column_names(self) -> Sequence[str]:
"""
The names associated with the columns of the generated model matrix.
"""
return tuple(feature for row in self.structure for feature in row.columns)
@property
def feature_names(self) -> Sequence[str]:
"""
A deprecated reference to `ModelSpec.column_names`. Will be removed in
v1.0.0.
"""
warnings.warn(
"`ModelSpec.feature_names` is deprecated and will be removed in v1.0.0. Use `ModelSpec.column_names` instead.",
DeprecationWarning,
)
return self.column_names
@cached_property
def column_indices(self) -> OrderedDict[str, int]:
"""
An ordered mapping from column names to the column index in generated
model matrices.
"""
return OrderedDict([(name, i) for i, name in enumerate(self.column_names)])
@property
def feature_indices(self) -> Sequence[str]:
"""
A deprecated reference to `ModelSpec.column_indices`. Will be removed in
v1.0.0.
"""
warnings.warn(
"`ModelSpec.feature_indices` is deprecated and will be removed in v1.0.0. Use `ModelSpec.column_indices` instead.",
DeprecationWarning,
)
return self.column_indices
@property
def terms(self) -> List[Term]:
"""
The terms used to generate model matrices from this `ModelSpec`
instance.
"""
return self.formula.root
@cached_property
def term_indices(self) -> OrderedDict[Term, Tuple[int, ...]]:
"""
An ordered mapping of `Term` instances to the generated column indices.
Note: Since terms hash using their string representation, you can look
up elements of this mapping using the string representation of the
`Term`.
"""
slices = OrderedDict()
start = 0
for row in self.structure:
end = start + len(row[2])
slices[row[0]] = tuple(range(start, end))
start = end
return slices
@cached_property
def term_slices(self) -> OrderedDict[Term, slice]:
"""
An ordered mapping of `Term` instances to a slice that when used on
the columns of the model matrix will subsample the model matrix down to
those corresponding to each term.
Note: Since terms hash using their string representation, you can look
up elements of this mapping using the string representation of the
`Term`.
"""
return OrderedDict(
{k: slice(v[0], v[-1] + 1) for k, v in self.term_indices.items()}
)
# Transforms
def update(self, **kwargs):
"""
Create a copy of this `ModelSpec` instance with the nominated attributes
mutated.
"""
return replace(self, **kwargs)
def differentiate(
self, *vars, use_sympy=False # pylint: disable=redefined-builtin
):
"""
EXPERIMENTAL: Take the gradient of this model spec. When used a linear
regression, evaluating a trained model on model matrices generated by
this formula is equivalent to estimating the gradient of that fitted
form with respect to `vars`.
Args:
vars: The variables with respect to which the gradient should be
taken.
use_sympy: Whether to use sympy to perform symbolic differentiation.
Notes:
This method is provisional and may be removed in any future major
version.
"""
return self.update(
formula=self.formula.differentiate(*vars, use_sympy=use_sympy),
)
# Utility methods
def get_model_matrix(
self, data: Any, context: Optional[Mapping[str, Any]] = None, **attr_overrides
) -> ModelMatrix:
"""
Build the model matrix (or matrices) realisation of this model spec for
the nominated `data`.
Args:
data: The data for which to build the model matrices.
context: An additional mapping object of names to make available in
when evaluating formula term factors.
attr_overrides: Any `ModelSpec` attributes to override before
constructing model matrices. This is shorthand for first
running `ModelSpec.update(**attr_overrides)`.
"""
if attr_overrides:
return self.update(**attr_overrides).get_model_matrix(data, context=context)
if self.materializer is None:
materializer = FormulaMaterializer.for_data(data)
else:
materializer = FormulaMaterializer.for_materializer(self.materializer)
return materializer(
data, context=context, **(self.materializer_params or {})
).get_model_matrix(self)
def get_linear_constraints(self, spec: LinearConstraintSpec) -> LinearConstraints:
"""
Construct a `LinearConstraints` instance from a specification based on
the structure of the model matrices associated with this model spec.
Args:
spec: The specification from which to derive the constraints. Refer
to `LinearConstraints.from_spec` for more details.
"""
return LinearConstraints.from_spec(spec, variable_names=self.column_names)
def get_slice(self, columns_identifier: Union[int, str, Term, slice]) -> slice:
"""
Generate a `slice` instance corresponding to the columns associated with
the nominated `columns_identifier`.
Args:
columns_identifier: The identifier for which the slice should be
generated. Can be one of:
- an integer specifying a specific column index.
- a `Term` instance
- a string representation of a term
- a column name
"""
if isinstance(columns_identifier, slice):
return columns_identifier
if isinstance(columns_identifier, int):
return slice(columns_identifier, columns_identifier + 1)
term_slices = self.term_slices
if isinstance(columns_identifier, Term):
if columns_identifier not in term_slices:
raise ValueError(
f"Model matrices built using this spec do not include term: `{columns_identifier}`."
)
return term_slices[columns_identifier]
if columns_identifier in term_slices:
return term_slices[columns_identifier]
column_indices = self.column_indices
if columns_identifier in column_indices:
idx = column_indices[columns_identifier]
return slice(idx, idx + 1)
raise ValueError(
f"Model matrices built using this spec do not have any columns related to: `{repr(columns_identifier)}`."
)
# Only include dataclass fields when pickling.
def __getstate__(self):
return {
k: v for k, v in self.__dict__.items() if k in self.__dataclass_fields__
}
class ModelSpecs(Structured[ModelSpec]):
"""
A `Structured[ModelSpec]` subclass that exposes some convenience methods
that should be mapped onto all contained `ModelSpec` instances.
"""
def _prepare_item(self, key: str, item: Any) -> Any:
# Verify that all included items are `ModelSpec` instances.
if not isinstance(item, ModelSpec):
raise TypeError(
"`ModelSpecs` instances expect all items to be instances of "
f"`ModelSpec`. [Got: {repr(item)} of type {repr(type(item))} "
f"for key {repr(key)}."
)
return item
def get_model_matrix(
self, data: Any, context: Optional[Mapping[str, Any]] = None, **attr_overrides
) -> ModelMatrices:
"""
This method proxies the `ModelSpec.get_model_matrix(...)` API and allows
it to be called on a structured set of `ModelSpec` instances. If all
`ModelSpec.materializer` and `ModelSpec.materializer_params` values are
unset or the same, then they are jointly evaluated allowing re-use of
the same cached across the specs.
Args:
data: The data for which to build the model matrices.
context: An additional mapping object of names to make available in
when evaluating formula term factors.
attr_overrides: Any `ModelSpec` attributes to override before
constructing model matrices. This is shorthand for first
running `ModelSpec.from_spec(model_specs, **attr_overrides)`.
"""
from formulaic import ModelMatrices
if attr_overrides:
return ModelSpec.from_spec(self, **attr_overrides).get_model_matrix(
data, context=context
)
# Check whether we can generate model matrices jointly (i.e. all
# materializers and their params are the same)
jointly_generate = False
materializer, materializer_params = None, None
for spec in self._flatten():
if not spec.materializer:
continue
if materializer not in (
None,
spec.materializer,
) or materializer_params not in (
None,
spec.materializer_params,
):
break
materializer, materializer_params = (
spec.materializer,
spec.materializer_params or None,
)
else:
jointly_generate = True
if jointly_generate:
if materializer is None:
materializer = FormulaMaterializer.for_data(data)
else:
materializer = FormulaMaterializer.for_materializer(materializer)
return materializer(
data, context=context, **(materializer_params or {})
).get_model_matrix(self)
return self._map(
lambda model_spec: model_spec.get_model_matrix(data, context=context),
as_type=ModelMatrices,
)
def differentiate(
self, *vars, use_sympy=False # pylint: disable=redefined-builtin
) -> ModelSpecs:
"""
This method proxies the experimental `ModelSpec.differentiate(...)` API.
See `ModelSpec.differentiate` for more details.
"""
return self._map(
lambda model_spec: model_spec.differentiate(*vars, use_sympy=use_sympy),
as_type=ModelSpecs,
)
from __future__ import annotations
import copy
from typing import Any, Generic, Optional, TypeVar, TYPE_CHECKING
import wrapt
from formulaic.parser.types.structured import Structured
if TYPE_CHECKING: # pragma: no cover
from .model_spec import ModelSpec, ModelSpecs
MatrixType = TypeVar("MatrixType")
class ModelMatrix(Generic[MatrixType], wrapt.ObjectProxy):
"""
A wrapper around arbitrary model matrix output representations.
This wrapper allows for `isinstance(..., ModelMatrix)` checks, and allows
one to access the `ModelSpec` instance associated with its creation using
`<model_matrix>.model_spec`. All other instance attributes and methods of
the wrapped object are directly accessible as if the object were unwrapped.
"""
def __init__(self, matrix: Any, spec: Optional[ModelSpec] = None):
wrapt.ObjectProxy.__init__(self, matrix)
self._self_model_spec = spec
@property
def model_spec(self) -> Optional[ModelSpec]:
"""
The `ModelSpec` instance associated with the creation of this
`ModelMatrix` instance.
This `ModelSpec` instance can be used to create other `ModelMatrix`s
that respect all the choices (including feature selection and encoding)
that were made in the construction of this `ModelMatrix` instance.
"""
return self._self_model_spec
def __repr__(self):
return self.__wrapped__.__repr__() # pragma: no cover
# Handle copying behaviour
def __copy__(self):
return type(self)(copy.copy(self.__wrapped__), spec=self._self_model_spec)
def __deepcopy__(self, memo=None):
return type(self)(
copy.deepcopy(self.__wrapped__, memo),
spec=copy.deepcopy(self._self_model_spec),
)
class ModelMatrices(Structured[ModelMatrix]):
"""
A `Structured[ModelMatrix]` subclass that adds a `.model_spec` attribute
(mirrorin `ModelMatrix.model_spec`) that returns a structured container for
all the `ModelSpec` instances associated with the `ModelSpec` objects
referenced by this container.
"""
def _prepare_item(
self, key: str, item: Any
) -> Any: # Verify that all included items are `ModelSpec` instances.
# Verify that all included items are `ModelMatrix` instances.
if not isinstance(item, ModelMatrix):
raise TypeError(
"`ModelMatrices` instances expect all items to be instances "
f"of `ModelMatrix`. [Got: {repr(item)} of type "
f"{repr(type(item))} for key {repr(key)}."
)
return item
@property
def model_spec(self) -> ModelSpecs:
"""
The `ModelSpecs` instance representing the structured set of `ModelSpec`
instances associated with the `ModelMatrix` instances stored in this
`Structured` instance.
"""
from .model_spec import ModelSpecs
return self._map(
lambda model_matrix: model_matrix.model_spec, as_type=ModelSpecs
)
from __future__ import annotations
import warnings
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union
from typing_extensions import TypeAlias
from .errors import FormulaInvalidError
from .model_matrix import ModelMatrix
from .parser import DefaultFormulaParser
from .parser.types import FormulaParser, Structured, Term
from .utils.calculus import differentiate_term
FormulaSpec: TypeAlias = Union[
str,
List[Union[str, Term]],
Set[Union[str, Term]],
Structured[Union[str, List[Term], Set[Term]]],
"Formula", # Direct formula specification
Dict[str, "FormulaSpec"],
Tuple["FormulaSpec", ...], # Structured formulae
]
class Formula(Structured[List[Term]]):
"""
A Formula is a (potentially structured) list of terms, which is represented
by this class.
This is a thin wrapper around `Strucuted[List[Term]]` that adds convenience
methods for building model matrices from the formula (among other common
tasks). You can build a `Formula` instance by passing in a string for
parsing, or by manually assembling the terms yourself.
Examples:
```
>>> Formula("y ~ x")
.lhs:
y
.rhs:
1 + x
>>> Formula("x + y", a=["x", "y:z"], b="y ~ z")
root:
1 + x + y
.a:
x + y:z
.b:
.lhs:
y
.rhs:
z
```
You can control how strings are parsed into terms by passing in custom
parsers via `_parser` and `_nested_parser`.
```
>>> Formula("y ~ x", _parser=DefaultFormulaParser(include_intercept=False))
.lhs:
y
.rhs:
x
```
Attributes:
_parser: The `FormulaParser` instance to use when parsing complete
formulae (vs. individual terms). If not specified,
`DefaultFormulaParser()` is used.
_nested_parser: The `FormulaParser` instance to use when parsing
strings describing nested or individual terms (e.g. when `spec` is a
list of string term identifiers). If not specified and `_parser` is
specified, `_parser` is used; if `_parser` is not specified,
`DefaultFormulaParser(include_intercept=False)` is used instead.
"""
DEFAULT_PARSER = DefaultFormulaParser()
DEFAULT_NESTED_PARSER = DefaultFormulaParser(include_intercept=False)
__slots__ = ("_parser", "_nested_parser")
@classmethod
def from_spec(
cls,
spec: FormulaSpec,
parser: Optional[FormulaParser] = None,
nested_parser: Optional[FormulaParser] = None,
) -> Formula:
"""
Construct a `Formula` instance from a formula specification.
Args:
spec: The formula specification.
parser: The `FormulaParser` instance to use when parsing complete
formulae (vs. individual terms). If not specified,
`DefaultFormulaParser()` is used.
nested_parser: The `FormulaParser` instance to use when parsing
strings describing nested or individual terms (e.g. when `spec`
is a list of string term identifiers). If not specified and
`parser` is specified, `parser` is used; if `parser` is not
specified, `DefaultFormulaParser(include_intercept=False)` is
used instead.
"""
if isinstance(spec, Formula):
return spec
return Formula(spec, _parser=parser, _nested_parser=nested_parser)
def __init__(
self,
*args,
_parser: Optional[FormulaParser] = None,
_nested_parser: Optional[FormulaParser] = None,
**kwargs,
):
self._parser = _parser or self.DEFAULT_PARSER
self._nested_parser = _nested_parser or _parser or self.DEFAULT_NESTED_PARSER
super().__init__(*args, **kwargs)
self._simplify(unwrap=False, inplace=True)
def _prepare_item(self, key: str, item: FormulaSpec) -> Union[List[Term], Formula]:
"""
Convert incoming formula items into either a list of Terms or a nested
`Formula` instance.
Note: Where parsing of strings is required, the nested-parser is used
except for the root element of the parent formula.
Args:
key: The structural key where the item will be stored.
item: The specification to convert.
"""
if isinstance(item, str):
item = (
(self._parser if key == "root" else self._nested_parser)
.get_terms(item, sort=True)
._simplify()
)
if isinstance(item, Structured):
formula_or_terms = Formula(
_parser=self._nested_parser, **item._structure
)._simplify()
elif isinstance(item, (list, set)):
formula_or_terms = [
term
for value in item
for term in (
self._nested_parser.get_terms(value)
if isinstance(value, str)
else [value]
)
]
self.__validate_terms(formula_or_terms)
formula_or_terms = sorted(formula_or_terms)
else:
raise FormulaInvalidError(
f"Unrecognized formula specification: {repr(item)}."
)
return formula_or_terms
@classmethod
def __validate_terms(cls, formula_or_terms: Any):
"""
Verify that all terms are of the appropriate type. The acceptable types
are:
- List[Terms]
- Tuple[List[Terms], ...]
- Formula
"""
if not isinstance(formula_or_terms, list):
# Should be impossible to reach this; here as a sentinel
raise FormulaInvalidError(
f"All components of a formula should be lists of `Term` instances. Found: {repr(formula_or_terms)}."
)
for term in formula_or_terms:
if not isinstance(term, Term):
raise FormulaInvalidError(
f"All terms in formula should be instances of `formulaic.parser.types.Term`; received term {repr(term)} of type `{type(term)}`."
)
def get_model_matrix(
self, data: Any, context: Optional[Mapping[str, Any]] = None, **spec_overrides
) -> Union[ModelMatrix, Structured[ModelMatrix]]:
"""
Build the model matrix (or matrices) realisation of this formula for the
nominated `data`.
Args:
data: The data for which to build the model matrices.
context: An additional mapping object of names to make available in
when evaluating formula term factors.
spec_overrides: Any `ModelSpec` attributes to set/override. See
`ModelSpec` for more details.
"""
from .model_spec import ModelSpec
return ModelSpec.from_spec(self, **spec_overrides).get_model_matrix(
data, context=context
)
def differentiate( # pylint: disable=redefined-builtin
self,
*vars: Tuple[str, ...],
use_sympy: bool = False,
) -> Formula:
"""
EXPERIMENTAL: Take the gradient of this formula. When used a linear
regression, evaluating a trained model on model matrices generated by
this formula is equivalent to estimating the gradient of that fitted
form with respect to `vars`.
Args:
vars: The variables with respect to which the gradient should be
taken.
use_sympy: Whether to use sympy to perform symbolic differentiation.
Notes:
This method is provisional and may be removed in any future major
version.
"""
return self._map(
lambda terms: [
differentiate_term(term, vars, use_sympy=use_sympy) for term in terms
]
)
@property
def terms(self) -> Formula:
warnings.warn(
"`Formula.terms` is deprecated. Please index/iterate over `Formula` directly instead.",
DeprecationWarning,
)
return self
def __getattr__(self, attr):
# Keep substructures wrapped to retain access to helper functions.
subformula = super().__getattr__(attr)
if attr != "root":
return Formula.from_spec(subformula)
return subformula
def __getitem__(self, key):
# Keep substructures wrapped to retain access to helper functions.
subformula = super().__getitem__(key)
if key != "root":
return Formula.from_spec(subformula)
return subformula
def __repr__(self, to_str: bool = False):
if not self._has_structure and self._has_root:
return " + ".join([str(t) for t in self])
return str(self._map(lambda terms: " + ".join([str(t) for t in terms])))
from .formula import Formula, FormulaSpec
from .materializers import FactorValues
from .model_matrix import ModelMatrix, ModelMatrices
from .model_spec import ModelSpec, ModelSpecs
from .sugar import model_matrix
try:
from ._version import __version__, __version_tuple__
except ImportError: # pragma: no cover
__version__ = version = "unknown"
__version_tuple__ = version_tuple = ("unknown",)
__author__ = "Matthew Wardrop"
__author_email__ = "mpwardrop@gmail.com"
__all__ = [
"__author__",
"__author_email__",
"__version__",
"__version_tuple__",
"Formula",
"FormulaSpec",
"ModelMatrix",
"ModelMatrices",
"ModelSpec",
"ModelSpecs",
"model_matrix",
"FactorValues",
]
from functools import singledispatch, wraps
from typing import Any
import numpy
import pandas
import scipy.sparse
from formulaic.materializers.types.factor_values import FactorValues
def propagate_metadata(func):
@wraps(func)
def wrapper(data, *args, **kwargs):
evaluated = func(data, *args, **kwargs)
if isinstance(data, FactorValues):
return FactorValues(
evaluated,
metadata=data.__formulaic_metadata__,
)
return evaluated
return wrapper
@singledispatch
@propagate_metadata
def as_columns(data: Any) -> Any:
"""
Get the columns for `data`. If `data` represents a single column, or is a
dictionary (the format used to store columns), it is returned as is.
"""
return data
@as_columns.register
@propagate_metadata
def _(data: pandas.DataFrame):
return dict(data.items())
@as_columns.register
@propagate_metadata
def _(data: numpy.ndarray):
if len(data.shape) == 1:
return data
if len(data.shape) > 2:
raise ValueError(
"Formulaic does not know how to convert numpy arrays with more than "
"two dimensions into columns."
)
if (
hasattr(data, "__formulaic_metadata__")
and data.__formulaic_metadata__.column_names
):
column_names = data.__formulaic_metadata__.column_names
else:
column_names = list(range(data.shape[1]))
return {column_names[i]: data[:, i] for i in range(data.shape[1])}
@as_columns.register
@propagate_metadata
def _(data: scipy.sparse.csc_matrix):
if (
hasattr(data, "__formulaic_metadata__")
and data.__formulaic_metadata__.column_names
):
column_names = data.__formulaic_metadata__.column_names
else:
column_names = list(range(data.shape[1]))
return {column_names[i]: data[:, i] for i in range(data.shape[1])}
import itertools
from collections.abc import MutableMapping
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
class LayeredMapping(MutableMapping):
"""
A mutable mapping implementation that allows you to stack multiple mappings
on top of one another, passing key lookups through the stack from top to
bottom until the key is found or the stack is exhausted. Mutations are
stored in an additional layer local only to the `LayeredMapping` instance,
and the layers passed in are never mutated.
"""
def __init__(self, *layers: Tuple[Optional[Mapping]]):
"""
Crepare a `LayeredMapping` instance, populating it with the nominated
layers.
"""
self.mutations: Dict = {}
self.layers: List[Mapping] = self.__filter_layers(layers)
@staticmethod
def __filter_layers(layers: Iterable[Mapping]) -> List[Mapping]:
"""
Filter incoming `layers` down to those which are not null.
"""
return [layer for layer in layers if layer is not None]
def __getitem__(self, key: Any) -> Any:
for layer in [self.mutations, *self.layers]:
if key in layer:
return layer[key]
raise KeyError(key)
def __setitem__(self, key: Any, value: Any):
self.mutations[key] = value
def __delitem__(self, key: Any):
if key in self.mutations:
del self.mutations[key]
else:
raise KeyError(f"Key '{key}' not found in mutable layer.")
def __iter__(self):
keys = set()
for layer in [self.mutations, *self.layers]:
for key in layer:
if key not in keys:
keys.add(key)
yield key
def __len__(self):
return len(set(itertools.chain(self.mutations, *self.layers)))
def with_layers(
self,
*layers: Tuple[Optional[Mapping]],
prepend: bool = True,
inplace: bool = False,
) -> "LayeredMapping":
"""
Return a copy of this `LayeredMapping` instance with additional layers
added.
Args:
layers: The layers to add.
prepend: Whether to add the layers before (if `True`) or after (if
`False`) the current layers.
inplace: Whether to mutate the existing `LayeredMapping` instance
instead of returning a copy.
Returns:
A reference to the `LayeredMapping` instance with the extra layers.
"""
layers = self.__filter_layers(layers)
if not layers:
return self
if inplace:
self.layers = (
[*layers, *self.layers] if prepend else [*self.layers, *layers]
)
return self
new_layers = [*layers, self] if prepend else [self, *layers]
return LayeredMapping(*new_layers)
import ast
import functools
import inspect
import keyword
import re
from typing import Any, Callable, Mapping, MutableMapping, Optional, TYPE_CHECKING
import astor
import numpy
from .iterators import peekable_iter
from .layered_mapping import LayeredMapping
if TYPE_CHECKING:
from formulaic.model_spec import ModelSpec # pragma: no cover
def stateful_transform(func: Callable) -> Callable:
"""
Transform a callable object into a stateful transform.
This is done by adding special arguments to the callable's signature:
- _state: The existing state or an empty dictionary.
- _metadata: Any extra metadata passed about the factor being evaluated.
- _spec: The `ModelSpec` instance being evaluated (or an empty `ModelSpec`).
If the callable has any of these in its signature, these will be passed onto
it; otherwise, they will be swallowed by the stateful transform wrapper.
Stateful transforms are also transformed into single dispatches, allowing
different implementations for incoming data types.
Args:
func: The function (or other callable) to be made into a stateful
transform.
Returns:
The stateful transform callable.
"""
func = functools.singledispatch(func)
params = inspect.signature(func).parameters.keys()
@functools.wraps(func)
def wrapper(data, *args, _metadata=None, _state=None, _spec=None, **kwargs):
from formulaic.model_spec import ModelSpec
_state = {} if _state is None else _state
extra_params = {}
if "_metadata" in params:
extra_params["_metadata"] = _metadata
if "_spec" in params:
extra_params["_spec"] = _spec or ModelSpec(formula=[])
if isinstance(data, dict):
results = {}
for key, datum in data.items():
if isinstance(key, str) and key.startswith("__"):
results[key] = datum
else:
statum = _state.get(key, {})
results[key] = wrapper(
datum, *args, _state=statum, **extra_params, **kwargs
)
if statum:
_state[key] = statum
return results
return func(data, *args, _state=_state, **extra_params, **kwargs)
wrapper.__is_stateful_transform__ = True
return wrapper
def stateful_eval(
expr: str,
env: Optional[Mapping],
metadata: Optional[Mapping],
state: Optional[Mapping],
spec: Optional["ModelSpec"],
) -> Any:
"""
Evaluate an expression in a nominated environment and with a nominated state.
Under the hood this calls out to `eval`, and so if incoming expressions are
not safe, you should make sure that your `env` is properly isolated from
potentially unsafe methods and/or sys-calls.
Args:
expr: The expression to be evaluated.
env: The environment in which the expression is to be evaluated. This
environment is the only environment from which variables can be
looked up during the evaluation.
metadata: Additional metadata about the expression (passed through to
stateful transforms).
state: The current state of any stateful transforms (passed through to
stateful transforms).
spec: The current `ModelSpec` instance being evaluated (passed through
to stateful transforms).
Returns:
The result of the evaluation.
Notes:
- The state mapping is likely to be mutated in-place when using stateful
transforms. If you need to retain the original state, copy it
*before* calling this method.
"""
metadata = {} if metadata is None else metadata
state = {} if state is None else state
env = LayeredMapping(
env
) # We sometimes mutate env, so we make sure we do so in a local mutable layer.
# Ensure that variable names in code are valid for Python's interpreter
# If not, create new variable in mutable env layer, and update code.
expr = sanitize_variable_names(expr, env)
# Parse Python code
code = ast.parse(expr, mode="eval")
# Extract the nodes of the graph that correspond to stateful transforms
stateful_nodes = {}
for node in ast.walk(code):
if _is_stateful_transform(node, env):
stateful_nodes[astor.to_source(node).strip().replace("\n ", "")] = node
# Mutate stateful nodes to pass in state from a shared dictionary.
for name, node in stateful_nodes.items():
name = name.replace('"', r'\\\\"')
if name not in state:
state[name] = {}
node.keywords.append(
ast.keyword(
"_metadata",
ast.parse(f'__FORMULAIC_METADATA__.get("{name}")', mode="eval").body,
)
)
node.keywords.append(
ast.keyword(
"_state", ast.parse(f'__FORMULAIC_STATE__["{name}"]', mode="eval").body
)
)
node.keywords.append(
ast.keyword("_spec", ast.parse("__FORMULAIC_SPEC__", mode="eval").body)
)
# Compile mutated AST
code = compile(ast.fix_missing_locations(code), "", "eval")
assert "__FORMULAIC_METADATA__" not in env
assert "__FORMULAIC_STATE__" not in env
assert "__FORMULAIC_SPEC__" not in env
# Evaluate and return
return eval(
code,
{},
LayeredMapping(
{
"__FORMULAIC_METADATA__": metadata,
"__FORMULAIC_SPEC__": spec,
"__FORMULAIC_STATE__": state,
},
env,
),
) # nosec
def _is_stateful_transform(node: ast.AST, env: Mapping) -> bool:
"""
Check whether a given ast.Call node enacts a stateful transform given
the available symbols in `env`.
Args:
node: The AST node in question.
env: The current environment in which the node is evaluated. This is
used to look up the function handle so it can be inspected.
Return:
`True` if the node is a call node and the callable associated with the
node is a stateful transform. `False` otherwise.
"""
if not isinstance(node, ast.Call):
return False
try:
func = eval(
compile(astor.to_source(node.func).strip(), "", "eval"), {}, env
) # nosec; Get function handle (assuming it exists in env)
return getattr(func, "__is_stateful_transform__", False)
except NameError:
return False
# Variable sanitization
UNQUOTED_BACKTICK_MATCHER = re.compile(
r"(\\\"|\"(?:\\\"|[^\"])*\"|\\'|'(?:\\'|[^'])*'|`)"
)
def sanitize_variable_names(expr: str, env: Mapping) -> str:
"""
Sanitize any variables names in the expression that are not valid Python
identifiers and are surrounded by backticks (`). This allows use of field
names that are not valid Python names.
This function transforms `expr` into a new expression where identifiers that
would cause `SyntaxError`s are transformed into valid Python identifiers.
E.g. "func(`1a`)" -> "func(_1a)". `env` is updated to reflect the mapping of
the old identifier to the new one, provided that the original variable name
was already present.
Args:
expr: The expression to sanitize.
env: The environment to keep updated with any name substitutions. This
environment mapping will be mutated in place during this evaluation.
Returns:
The sanitized expression.
"""
expr_parts = peekable_iter(UNQUOTED_BACKTICK_MATCHER.split(expr))
sanitized_expr = []
for expr_part in expr_parts:
if expr_part == "`":
variable_name_parts = []
while expr_parts.peek(None) not in ("`", None):
variable_name_parts.append(next(expr_parts))
variable_name = "".join(variable_name_parts)
if expr_parts.peek(None) is None:
sanitized_expr.append(f"`{variable_name}")
else:
next(expr_parts)
new_name = sanitize_variable_name(variable_name, env)
sanitized_expr.append(f" {new_name} ")
else:
sanitized_expr.append(expr_part)
return "".join(sanitized_expr).strip()
def sanitize_variable_name(name: str, env: MutableMapping) -> str:
"""
Generate a valid Python variable name for variable identifier `name`.
Args:
name: The variable name to sanitize.
env: The mapping of variable name to values in the evaluation
environment. If `name` is present in this mapping, an alias is
created for the same value for the new variable name.
"""
if name.isidentifier() or keyword.iskeyword(name):
return name
# Compute recognisable basename
base_name = "".join([char if re.match(r"\w", char) else "_" for char in name])
if base_name[0].isdigit():
base_name = "_" + base_name
# Verify new name is not in env already, and if not add a random suffix.
new_name = base_name
while new_name in env:
new_name = (
base_name
+ "_"
+ "".join(numpy.random.choice(list("abcefghiklmnopqrstuvwxyz"), 10))
)
# Reuse the value for `name` for `new_name` also.
if name in env:
env[new_name] = env[name]
return new_name
from typing import Iterable, Set
from formulaic.parser.types import Factor, Term
def differentiate_term(
term: Term,
vars: Iterable[str], # pylint: disable=redefined-builtin
use_sympy: bool = False,
) -> Term:
"""
Symbolically differentiate a `Term` instance with respect to one or more `vars`.
Args:
term: The `Term` instance to differentiate.
vars: The variables by which to differentiate.
use_sympy: Whether to interpret factor token strings using sympy. If
`True`, symbolic factors like `log(x)` can be differentiated with
respect to `x`. If `False`, factor token strings must match the
variable exactly in order to be detected.
Returns:
A new `Term` instance representing the differentiated term.
Notes:
- This method takes into account the chain rule/etc.
- Care must be taken to make sure that the symbolic representation of
the factors can be properly interpreted by `sympy`. For example, `I(x)`
would not be understood.
"""
factors = set(term.factors)
for var in vars:
affected_factors = set(
factor
for factor in factors
if var in _factor_symbols(factor, use_sympy=use_sympy)
)
if not affected_factors:
return Term({Factor("0", eval_method="literal")})
factors = factors.difference(affected_factors).union(
_differentiate_factors(affected_factors, var, use_sympy=use_sympy)
)
return Term(factors or {Factor("1", eval_method="literal")})
def _factor_symbols(factor: Factor, use_sympy: bool = False) -> Set[str]:
"""
Extract the symbols represented in a factor.
Args:
factor: The `Factor` instance from which symbols should be extracted.
use_sympy: Whether to interpret the string representation of the
factor using `sympy`.
Returns:
The set of string symbols represented by the factor.
"""
if use_sympy:
try:
import sympy
return {str(s) for s in sympy.S(factor.expr).free_symbols}
except ImportError as e: # pragma: no cover
raise ImportError(
"`sympy` is not available. Install it using `pip install formulaic[calculus]` or `pip install sympy`."
) from e
return {factor.expr}
def _differentiate_factors(
factors: Set[Factor], var: str, use_sympy: bool = False
) -> Set[Factor]:
"""
Differentiate the nominated `factors` by `var`.
Args:
factors: The set of factors which should be differentiated (taking for
granted that they are multiplied together).
var: The variable by which to differentiate.
use_sympy: Whether to perform the differentiation using sympy, allowing
for symbolic differentiations like `log(x)` -> `1/x`.
Returns:
A set containing the new factors to replace the incoming factors in a
term.
"""
if use_sympy:
try:
import sympy
expr = sympy.S(
"(" + ") * (".join(factor.expr for factor in factors) + ")"
).diff(var)
eval_method = "python"
except ImportError as e: # pragma: no cover
raise ImportError(
"`sympy` is not available. Install it using `pip install formulaic[calculus]` or `pip install sympy`."
) from e
else:
assert len(factors) == 1
expr = 1
eval_method = next(iter(factors)).eval_method
if expr == 1:
return set()
return {Factor(f"({str(expr)})", eval_method=eval_method)}
import sys
from typing import Any, Optional, Mapping, Union
from .layered_mapping import LayeredMapping
def capture_context(
context: Optional[Union[int, Mapping[str, Any]]] = 0
) -> Optional[Mapping[str, Any]]:
"""
Explicitly capture the context to be used by subsequent formula
materialisations.
Note: This function is primarily useful in libraries that wrap Formulaic,
allowing them to easily decouple the extraction of evaluation context from
the actual materializations calls, which may be several frames removed from
the users. Also note that implementers are free to do context collection
without this method, since passing of a dictionary context will always be
supported; however using this method allows users to treat formulaic as a
black box.
Args:
context: The context from which variables (and custom transforms/etc)
should be inherited. When specified as an integer, it is interpreted
as a frame offset from the caller's frame (i.e. 0, the default,
means that all variables in the caller's scope should be made
accessible when interpreting and evaluating formulae). Otherwise, a
mapping from variable name to value is expected. When nesting in a
library, and attempting to capture user-context, make sure you
account for the extra frames introduced by your wrappers.
Returns:
The context that should be later passed to the Formulaic materialization
procedure like: `.get_model_matrix(..., context=<this object>)`.
"""
if isinstance(context, int):
if hasattr(sys, "_getframe"):
frame = sys._getframe(context + 1)
context = LayeredMapping(frame.f_locals, frame.f_globals)
else:
context = None # pragma: no cover
return context
class _MissingType:
__instance__ = None
def __new__(cls):
if cls.__instance__ is None:
cls.__instance__ = super(_MissingType, cls).__new__(cls)
return cls.__instance__
def __bool__(self):
return False
def __repr__(self):
return "MISSING"
def __copy__(self):
return self
def __deepcopy__(self, memo):
return self
MISSING = _MissingType()
from typing import Any, Iterable
from .sentinels import MISSING
class peekable_iter:
"""
An iterator that allows you to peek at the next element during iteration.
"""
def __init__(self, it: Iterable):
self._it = iter(it)
self._next = []
def __iter__(self):
return self
def __next__(self):
if self._next:
return self._next.pop(0)
return next(self._it)
def peek(self, default: Any = MISSING) -> Any:
"""
Retrieve the object that will be next returned by the iterator.
Args:
default: The value to return if there are no more elements in the
iterator (otherwise the `StopIteration` exception will be
forwarded).
"""
try:
if not self._next:
self._next.append(next(self._it))
return self._next[0]
except StopIteration:
if default is MISSING:
raise
return default
from typing import Iterable, Optional, Tuple, List
import numpy
import pandas
import scipy.sparse as spsparse
def categorical_encode_series_to_sparse_csc_matrix(
series: Iterable, levels: Optional[Iterable[str]] = None, drop_first: bool = False
) -> Tuple[List, spsparse.csc_matrix]:
"""
Categorically encode (via dummy encoding) a `series` as a sparse matrix.
Args:
series: The iterable which should be sparse encoded.
levels: The levels for which to generate dummies (if not specified, a
dummy variable is generated for every level in `series`).
drop_first: Whether to omit the first column in order to avoid
structural collinearity.
Returns:
A tuple of form `(levels, sparse_matrix)`, where `levels` contains the
levels that were used to generate dummies, and `sparse_matrix` is the
sparse (column-major) matrix representation of the series dummy
encoding.
"""
series = pandas.Categorical(series, levels)
levels = list(levels or series.categories)
if drop_first:
series = series.remove_categories(levels[0])
levels = levels[1:]
codes = series.codes
non_null_code_indices = codes != -1
indices = numpy.arange(series.shape[0])[non_null_code_indices]
codes = codes[non_null_code_indices]
sparse_matrix = spsparse.csc_matrix(
(
numpy.ones(codes.shape[0], dtype=float), # data
(indices, codes), # row # column
),
shape=(series.shape[0], len(levels)),
)
return levels, sparse_matrix
from __future__ import annotations
import ast
import functools
import itertools
from numbers import Number
from typing import Dict, Iterable, Optional, Sequence, Tuple, Union
import numpy
from formulaic.parser.algos.tokenize import tokenize
from formulaic.parser.algos.tokens_to_ast import tokens_to_ast
from formulaic.parser.types import (
ASTNode,
Factor,
OperatorResolver,
Operator,
Term,
Token,
)
from formulaic.parser.utils import exc_for_token
LinearConstraintSpec = Union[
str,
Dict[str, Number],
Tuple["numpy.typing.ArrayLike", "numpy.typing.ArrayLike"],
"numpy.typing.ArrayLike",
]
class LinearConstraints:
"""
Represents linear constraints of form $Ax = b$, where $A$ is a matrix of
coefficients for the features in $x$, and $b$ is a vector of constant
values.
Instances of this class are typically constructed via
`ModelSpec.get_linear_constraints(...)` but can also be constructed
directly for use in other contexts.
Attributes:
constraint_matrix: The matrix of coefficients on the features ($A$ from
above). Each row is one constraint.
constraint_values: The vector of constant values ($b$ from above).
variable_names: The ordered names of the variables represented by $x$;
typically the column names of a `ModelMatrix` instance.
"""
@classmethod
def from_spec(
cls, spec: LinearConstraintSpec, variable_names: Sequence[str] = None
) -> LinearConstraints:
"""
Construct a `LinearConstraints` instance from a specification.
Args:
spec: The specification from which to derive the constraints. Can be
a:
* str: In which case it is interpreted as a constraints
formula (e.g. "x + 2 * y = 3, z + y - x / 10"). All
variables used must be present in `variable_names`.
* Dict[str, Number]: In which case each key is treated as
formula, and each value as the constraint (e.g. {"x":19}
, {"a + b": 0}).
* Tuple: a two-tuple describing the constraint matrix and
values respectively.
* numpy.ndarray: a constraint matrix (with all values
assumed to be zero).
variable_names: The ordered names of the variables represented by
$x$; typically the column names of a `ModelMatrix` instance.
"""
if isinstance(spec, LinearConstraints):
return spec
if isinstance(spec, str):
matrix, values = LinearConstraintParser(
variable_names=variable_names
).get_matrix(spec)
return cls(matrix, values, variable_names)
if isinstance(spec, dict):
matrices, constants = [], []
for key, constant in spec.items():
matrix, values = LinearConstraintParser(
variable_names=variable_names
).get_matrix(key)
matrices.append(matrix)
constants.append(values + numpy.array(constant))
return cls(
numpy.vstack(matrices),
numpy.hstack(constants),
variable_names=variable_names,
)
if isinstance(spec, tuple) and len(spec) == 2:
return cls(*spec, variable_names=variable_names)
return cls(spec, 0, variable_names=variable_names)
def __init__(
self, constraint_matrix, constraint_values, variable_names: Sequence[str] = None
):
"""
Attributes:
constraint_matrix: The matrix of coefficients on the features ($A$ from
above). Each row is one constraint.
constraint_values: The vector of constant values ($b$ from above).
variable_names: The ordered names of the variables represented by $x$;
typically the column names of a `ModelMatrix` instance.
"""
constraint_matrix = numpy.array(constraint_matrix)
constraint_values = numpy.array(constraint_values)
# Prepare incoming values
if len(constraint_matrix.shape) == 1:
constraint_matrix = constraint_matrix.reshape(1, *constraint_matrix.shape)
if len(constraint_values.shape) == 0:
constraint_values = constraint_values * numpy.ones(
constraint_matrix.shape[0]
)
variable_names = variable_names or [
f"x{i}" for i in range(constraint_matrix.shape[1])
]
# Validate incoming values
if len(constraint_matrix.shape) != 2:
raise ValueError("`constraint_matrix` must be a 2D array.")
if len(constraint_values.shape) != 1:
raise ValueError("`constraint_values` must be a 1D array.")
if constraint_values.shape[0] != constraint_matrix.shape[0]:
raise ValueError(
"Number of rows in constraint matrix does not equal the number of values in the values array."
)
if len(variable_names) != constraint_matrix.shape[1]:
raise ValueError(
"Number of column names does not match the number of columns in the linear constraint matrix."
)
self.constraint_matrix = constraint_matrix
self.constraint_values = constraint_values
self.variable_names = variable_names or [
f"x{i}" for i in range(len(constraint_matrix))
]
def __str__(self):
out = []
for i in range(self.constraint_matrix.shape[0]):
out_one = []
for nonzero_col in numpy.where(self.constraint_matrix[i, :])[0]:
out_one.append(
f"{self.constraint_matrix[i, nonzero_col]} * {self.variable_names[nonzero_col]}"
)
out.append(" + ".join(out_one) + f" = {self.constraint_values[i]}")
return "\n".join(out)
def show(self):
"""
Pretty-print the constraints.
"""
print(str(self))
@property
def n_constraints(self):
"""
The number of constraints represented by this `LinearConstraints`
instance.
"""
return self.constraint_matrix.shape[0]
def __repr__(self):
return f"<LinearConstraints: {self.n_constraints} constraints>"
class LinearConstraintParser:
"""
A linear constraint parser.
While this parser re-uses parts of the parser stack under `FormulaParser`,
it interprets formulas using conventional algebra (rather than Wilkinson
formulas).
Attributes:
variable_names: The ordered names of the variables for which constraints
are being prepared. All variables used in the formula being parsed
must be present in this sequence.
operator_resolver: The operator resolver instance to use. If not
provided, `ConstraintOperatorResolver` is used.
"""
def __init__(
self,
variable_names: Sequence[str],
operator_resolver: Optional[OperatorResolver] = None,
):
self.variable_names = variable_names
self.operator_resolver = operator_resolver or ConstraintOperatorResolver()
def get_tokens(self, formula: str) -> Iterable[ConstraintToken]:
"""
Tokenize a constraint formula.
Args:
formula: The constraint formula to tokenize.
"""
return [ConstraintToken.for_token(token) for token in tokenize(formula)]
def get_ast(self, formula: str) -> ASTNode:
"""
Assemble an abstract syntax tree for the nominated `formula` string.
Args:
formula: The constraint formula for which an AST should be
generated.
"""
return tokens_to_ast(
self.get_tokens(formula),
operator_resolver=self.operator_resolver,
)
def get_terms(self, formula: str) -> Union[Sequence[Term], Tuple[Sequence[Term]]]:
"""
Build the `Term` instances for a constraint formula string.
Args:
formula: The constraint formula for which to build terms.
"""
ast = self.get_ast(formula)
if not ast:
return None
return ast.to_terms()
def get_matrix(
self, formula: str
) -> Tuple["numpy.typing.ArrayLike", "numpy.typing.ArrayLike"]:
"""
Build the constraint matrix and constraint values vector associated with
the parsed string.
Args:
formula: The constraint formula for which to build the constraint
matrix and values vector.
Returns:
A tuple of the contraint matrix and constraint values respectively.
"""
constraints = self.get_terms(formula)
if not constraints:
return numpy.empty((0, len(self.variable_names))), numpy.array([])
if not isinstance(constraints, tuple):
constraints = (constraints,)
col_vectors = dict(
zip(self.variable_names, numpy.eye(len(self.variable_names)))
)
matrix = []
constants = []
for constraint in constraints:
vector = numpy.zeros(len(self.variable_names))
constant = 0
for term in constraint:
if term.factor == 1:
constant += term.scale
else:
vector += term.scale * col_vectors[term.factor.expr]
matrix.append(vector)
constants.append(-constant)
return numpy.array(matrix), numpy.array(constants)
class ConstraintToken(Token):
"""
An enriched `Token` subclass that overrides `.to_terms()` to return
a set of `ScaledFactor`s rather than `Terms`s.
"""
@classmethod
def for_token(cls, token: Token):
return cls(
**{
attr: getattr(token, attr)
for attr in ("token", "kind", "source", "source_start", "source_end")
}
)
def to_terms(self):
if self.kind is Token.Kind.VALUE:
factor = ast.literal_eval(self.token)
if isinstance(factor, Number):
return {ScaledFactor(1, scale=factor)}
raise exc_for_token(
self,
message="Only numeric literal values are permitted in constraint formulae.",
)
return {ScaledFactor(self.to_factor())}
class ScaledFactor:
"""
A wrapper around a `Factor` instance that provides an additional "scale"
attribute to allow storing information about the scalar coefficient of each
`Factor`.
Attributes:
factor: The wrapped `Factor` instance.
scale: The scalar value to be used as the coefficient of this factor.
"""
def __init__(self, factor: Factor, *, scale: Number = 1):
self.factor = factor
self.scale = scale
def __add__(self, other):
if isinstance(other, ScaledFactor):
return ScaledFactor(self.factor, scale=self.scale + other.scale)
return NotImplemented # pragma: no cover
def __sub__(self, other):
if isinstance(other, ScaledFactor):
return ScaledFactor(self.factor, scale=self.scale - other.scale)
return NotImplemented # pragma: no cover
def __neg__(self):
return ScaledFactor(self.factor, scale=-self.scale)
def __hash__(self):
return hash(self.factor)
def __eq__(self, other):
if isinstance(other, ScaledFactor):
return self.factor == other.factor
return NotImplemented # pragma: no cover
def __repr__(self):
return f"{self.scale}*{self.factor}" # pragma: no cover
class ConstraintOperatorResolver(
OperatorResolver
): # pylint: disable=unnecessary-lambda
"""
The default constraint `OperatorResolver` implementation.
These operators describe a regular algebra rather than a Wikinson formula
one.
"""
@property
def operators(self):
def join_tuples(lhs, rhs):
if not isinstance(lhs, tuple):
lhs = (lhs,)
if not isinstance(rhs, tuple):
rhs = (rhs,)
return lhs + rhs
def add_terms(terms_left, terms_right):
terms_left = {term: term for term in terms_left}
terms_right = {term: term for term in terms_right}
added = set()
for term in terms_left:
if term in terms_right:
term = term + terms_right[term]
added.add(term)
added.update({term for term in terms_right if term not in added})
return added
def sub_terms(terms_left, terms_right):
terms_left = {term: term for term in terms_left}
terms_right = {term: term for term in terms_right}
added = set()
for term in terms_left:
if term in terms_right:
term = term - terms_right[term]
added.add(term)
added.update(
negate_terms({term for term in terms_right if term not in added})
)
return added
def negate_terms(terms):
return {-term for term in terms}
def mul_terms(terms_left, terms_right):
terms_left = {term: term for term in terms_left}
terms_right = {term: term for term in terms_right}
terms = set()
for term_left, term_right in itertools.product(terms_left, terms_right):
terms = add_terms(terms, {mul_term(term_left, term_right)})
return terms
def mul_term(term_left, term_right):
if term_left.factor == 1:
return ScaledFactor(
term_right.factor, scale=term_left.scale * term_right.scale
)
if term_right.factor == 1:
return ScaledFactor(
term_left.factor, scale=term_left.scale * term_right.scale
)
raise RuntimeError(
"Only one non-scalar factor can be involved in a linear constraint multiplication."
)
def div_terms(terms_left, terms_right):
terms_left = {term: term for term in terms_left}
terms_right = {term: term for term in terms_right}
terms = set()
for term_left, term_right in itertools.product(terms_left, terms_right):
terms = add_terms(terms, {div_term(term_left, term_right)})
return terms
def div_term(term_left, term_right):
if term_right.factor == 1:
return ScaledFactor(
term_left.factor, scale=term_left.scale / term_right.scale
)
raise RuntimeError(
"The right-hand operand must be a scalar in linear constraint division operations."
)
return [
Operator(
",",
arity=2,
precedence=-200,
associativity=None,
to_terms=join_tuples,
accepts_context=lambda context: all(c.symbol == "," for c in context),
structural=True,
),
Operator(
"=",
arity=2,
precedence=-100,
associativity=None,
to_terms=lambda lhs, rhs: add_terms(lhs, negate_terms(rhs)),
),
Operator(
"+",
arity=2,
precedence=100,
associativity="left",
to_terms=lambda *args: functools.reduce(add_terms, args),
),
Operator(
"-",
arity=2,
precedence=100,
associativity="left",
to_terms=lambda left, right: sub_terms(left, right),
),
Operator(
"+",
arity=1,
precedence=100,
associativity="right",
fixity="prefix",
to_terms=lambda arg: arg,
),
Operator(
"-",
arity=1,
precedence=100,
associativity="right",
fixity="prefix",
to_terms=lambda arg: negate_terms(arg),
),
Operator(
"*",
arity=2,
precedence=200,
associativity="left",
to_terms=lambda lhs, rhs: mul_terms(lhs, rhs),
),
Operator(
"/",
arity=2,
precedence=200,
associativity="left",
to_terms=lambda lhs, rhs: div_terms(lhs, rhs),
),
]
from dataclasses import dataclass
from typing import Iterable, List
from .ast_node import ASTNode
from .operator_resolver import OperatorResolver
from .structured import Structured
from .term import Term
from .token import Token
@dataclass
class FormulaParser:
"""
The base formula parser API.
The role of subclasses of this class is to transform a string representation
of a formula into a (structured) sequence of `Term` instances that can be
evaluated by materializers and ultimately rendered into model matrices.
This class can be subclassed to customize this behavior. The three phases of
formula parsing are split out into separate methods to make this easier.
They are:
- get_tokens: Which returns an iterable of `Token` instances. By default
this uses `tokenize()` and handles the addition/removal of the
intercept.
- get_ast: Which converts the iterable of `Token`s into an abstract
syntax tree. By default this uses `tokens_to_ast()` and the nominated
`OperatorResolver` instance.
- get_terms: Which evaluates the abstract syntax tree and returns an
iterable of `Term`s.
Only the `get_terms()` method is essential from an API perspective.
"""
operator_resolver: OperatorResolver
def get_tokens(self, formula: str) -> Iterable[Token]:
"""
Return an iterable of `Token` instances for the nominated `formula`
string.
Args:
formula: The formula string to be tokenized.
"""
from ..algos.tokenize import tokenize
return tokenize(formula)
def get_ast(self, formula: str) -> ASTNode:
"""
Assemble an abstract syntax tree for the nominated `formula` string.
Args:
formula: The formula for which an AST should be generated.
"""
from ..algos.tokens_to_ast import tokens_to_ast
return tokens_to_ast(
self.get_tokens(formula),
operator_resolver=self.operator_resolver,
)
def get_terms(self, formula: str, *, sort: bool = True) -> Structured[List[Term]]:
"""
Assemble the `Term` instances for a formula string. Depending on the
operators involved, this may be an iterable of `Term` instances, or
an iterable of iterables of `Term`s, etc.
Args:
formula: The formula for which an AST should be generated.
sort: Whether to sort the terms before returning them.
"""
ast = self.get_ast(formula)
if ast is None:
return Structured([])
terms = ast.to_terms()
if not isinstance(terms, Structured):
terms = Structured(terms)
if sort:
terms = terms._map(sorted)
return terms
from __future__ import annotations
import copy
import re
from enum import Enum
from typing import Any, Iterable, Optional, Tuple, Union
from .factor import Factor
from .term import Term
class Token:
"""
The atomic unit into which formula strings are parsed.
These tokens are intentionally very low-level, leaving interpretation and
validation to higher-levels. As such, adding new operators/etc does not
require any modification of this low-level code.
The four kinds of token are:
- context: a token used to scope terms into a given context
- operator: an operator to be applied to other surrounding tokens (will
always consist of non-word characters).
- name: a name of a feature/variable to be lifted from the model matrix
context.
- value: a literal value (string/number).
- python: a code string to be evaluated.
Attributes:
token: The portion of the formula string represented by this token.
kind: The kind of this token (see above).
source: The entire original source string.
source_start: The index of the character within the string that starts
this token.
source_end: The index of the character within the string that ends
this token.
Note: These attributes *should* all be present, but may not be fully
populated if generated outside of the default `tokenize()` implementation.
"""
class Kind(Enum):
CONTEXT = "context"
OPERATOR = "operator"
VALUE = "value"
NAME = "name"
PYTHON = "python"
__slots__ = ("token", "_kind", "source", "source_start", "source_end")
def __init__(
self,
token: str = "",
*,
kind: Optional[Union[str, Kind]] = None,
source: Optional[str] = None,
source_start: Optional[int] = None,
source_end: Optional[int] = None,
):
self.token = token
self.kind = kind
self.source = source
self.source_start = source_start
self.source_end = source_end or source_start
@property
def kind(self) -> Optional[Kind]:
return self._kind
@kind.setter
def kind(self, kind: Optional[Union[str, Kind]]):
self._kind = self.Kind(kind) if kind else kind
def update(
self, char: str, source_index: int, kind: Optional[Kind] = None
) -> "Token":
"""
Add a character to the token string, keeping track of the source
indices.
Args:
char: The character to add.
source_index: The index of the character within the source string.
kind: If present, the kind of the token is updated to reflect the
nominated kind.
Returns:
A reference to this token instance.
"""
self.token += char
if self.source_start is None:
self.source_start = source_index
self.source_end = source_index
if kind is not None:
self.kind = kind
return self
def __bool__(self):
return bool(self.token)
def __eq__(self, other):
if isinstance(other, str):
return self.token == other
if isinstance(other, Token):
return self.token == other.token and self.kind == other.kind
return NotImplemented
def __hash__(self):
return self.token.__hash__()
def __lt__(self, other):
if isinstance(other, Token):
return self.token < other.token
return NotImplemented
@property
def source_loc(self) -> Tuple[int, int]:
"""
The indices of the first and last character represented by this token in
the source string.
"""
return (self.source_start, self.source_end)
def to_factor(self) -> Factor:
"""
A `Factor` instance corresponding to this token. Note that operator
tokens cannot be converted to tokens.
"""
kind_to_eval_method = {
Token.Kind.NAME: "lookup",
Token.Kind.PYTHON: "python",
Token.Kind.VALUE: "literal",
}
return Factor(
expr=self.token,
eval_method=kind_to_eval_method[self.kind],
token=self,
)
def to_terms(self) -> Iterable[Term]:
"""
An iterable (set) of `Term` instances for this token. This will just be
an iterable with one `Term` having one `Factor` (that generated by
`.to_factor()`). Operator tokens cannot be converted to an iterable of
`Term`s.
"""
return {Term([self.to_factor()])}
def flatten(self, str_args=False) -> Any:
"""
Return this token (or if `str_args` is `True`, a string representation
of this token).
Args:
str_args: Whether to convert this token to a string during
flattening.
"""
return str(self) if str_args else self
def get_source_context(self, colorize: bool = False) -> str:
"""
Render a string that highlights the location of this token in the source
string.
Args:
colorize: Whether to highlight the location of this token in bold
red font.
"""
if not self.source or self.source_start is None or self.source_end is None:
return None
if colorize:
RED_BOLD = "\x1b[1;31m"
RESET = "\x1b[0m"
return f"{self.source[:self.source_start]}⧛{RED_BOLD}{self.source[self.source_start:self.source_end+1]}{RESET}⧚{self.source[self.source_end+1:]}"
return f"{self.source[:self.source_start]}⧛{self.source[self.source_start:self.source_end+1]}⧚{self.source[self.source_end+1:]}"
def __repr__(self):
return self.token
# Additional methods for later mutation
def copy_with_attrs(self, **attrs) -> Token:
"""
Return a copy of this `Token` instance with attributes set from attrs.
Args:
attrs: Attribute keys and values to set on the copy of this
instance.
"""
new_token = copy.copy(self)
for attr, value in attrs.items():
setattr(new_token, attr, value)
return new_token
def split(
self, pattern: Union[str, re.Pattern], after=False, before=False
) -> Iterable[Token]:
"""
Split this instance into multple tokens around all non-overlapping
matches of `pattern`.
Args:
pattern: The pattern by which to split this `Token` instance.
after: Whether to split after the pattern.
before: Whether to split before the pattern.
"""
if not after and not before:
yield self
return
if not isinstance(pattern, re.Pattern):
pattern = re.compile(pattern)
last_index = 0
separators = pattern.finditer(self.token)
def get_next_token(next_index):
return next_index, self.copy_with_attrs(
token=self.token[last_index:next_index]
)
for separator in separators:
if before:
last_index, new_token = get_next_token(separator.span()[0])
yield new_token
if after:
last_index, new_token = get_next_token(separator.span()[1])
yield new_token
if last_index < len(self.token):
yield get_next_token(len(self.token))[1]
from typing import Iterable, TYPE_CHECKING
if TYPE_CHECKING:
from .factor import Factor # pragma: no cover
class Term:
"""
Represents a "term" of a formula.
A "term" is a product of "factors" (represented by `Factor`) instances, and
a formula is made up of a sum of terms.
Attributes:
factors: The set of factors to be multipled to form the term.
"""
def __init__(self, factors: Iterable["Factor"]):
self.factors = tuple(sorted(set(factors)))
self._factor_exprs = tuple(factor.expr for factor in self.factors)
self._hash = hash(repr(self))
# Transforms and comparisons
def __mul__(self, other):
if isinstance(other, Term):
return Term([*self.factors, *other.factors])
return NotImplemented
def __hash__(self):
return self._hash
def __eq__(self, other):
if isinstance(other, Term):
return self._factor_exprs == other._factor_exprs
if isinstance(other, str):
return repr(self) == other
return NotImplemented
def __lt__(self, other):
if isinstance(other, Term):
if len(self.factors) == len(other.factors):
return sorted(self.factors) < sorted(other.factors)
if len(self.factors) < len(other.factors):
return True
return False
return NotImplemented
def __repr__(self):
return ":".join(self._factor_exprs)
import abc
from collections import defaultdict
from typing import List, Union
from ..utils import exc_for_token
from .operator import Operator
from .token import Token
# Cached property was introduced in Python 3.8 (we currently support 3.7)
try:
from functools import cached_property
except ImportError: # pragma: no cover
from cached_property import cached_property
class OperatorResolver(metaclass=abc.ABCMeta):
"""
Resolves which `Operator` instance should be used for a given operator
`Token`.
This class should be subclassed and have `.operators` and/or `.resolve()`
overridden in order to achieve the desired formula algebra.
Note: most users will probably be interested in extending/subclassing
`DefaultOperatorResolver`, which implements the default formula operator
logic. You should subclass this class directly only if you want to start
from scratch.
Attributes:
operator_table: A cache of the mapping from operator symbol to
`Operator` instances implementing it.
"""
@property
@abc.abstractmethod
def operators(self) -> List[Operator]:
"""
The `Operator` instance pool which can be matched to tokens by
`.resolve()`.
"""
@cached_property
def operator_table(self):
operator_table = defaultdict(list)
for operator in self.operators:
operator_table[operator.symbol].append(operator)
for symbol in operator_table:
operator_table[symbol] = sorted(
operator_table[symbol], key=lambda op: op.precedence, reverse=True
)
return operator_table
def resolve(
self, token: Token, max_prefix_arity: int, context: List[Union[Token, Operator]]
) -> List[Operator]:
"""
Return a list of operators to apply for a given token in the AST
generation.
Args:
token: The operator `Token` instance for which `Operator`(s) should
be resolved.
max_prefix_arity: The number operator unclaimed tokens preceding the
operator in the formula string.
context: The current list of operators into which the operator to be
resolved will be placed. This will be a list of `Operator`
instances or tokens (tokens are return for grouping operators).
"""
return [self._resolve(token, token.token, max_prefix_arity, context)]
def _resolve(
self,
token: Token,
symbol: str,
max_prefix_arity: int,
context: List[Union[Token, Operator]],
) -> Operator:
"""
The default operator resolving logic.
"""
if symbol not in self.operator_table:
raise exc_for_token(token, f"Unknown operator '{symbol}'.")
candidates = [
candidate
for candidate in self.operator_table[symbol]
if (
max_prefix_arity == 0
and candidate.fixity is Operator.Fixity.PREFIX
or max_prefix_arity > 0
and candidate.fixity is not Operator.Fixity.PREFIX
)
and candidate.accepts_context(context)
]
if not candidates:
raise exc_for_token(token, f"Operator `{symbol}` is incorrectly used.")
if len(candidates) > 1:
raise exc_for_token(
token,
f"Ambiguous operator `{symbol}`. This is not usually a user error. Please report this!",
)
return candidates[0]
# The operator table cache may not be pickleable, so let's drop it.
def __getstate__(self):
return {}
from __future__ import annotations
from enum import Enum
from numbers import Number
from typing import Callable, List, Iterable, Union
from .term import Term
from .token import Token
class Operator:
"""
Specification for how an operator in a formula string should behave.
Attributes:
symbol: The operator for which the configuration applies.
arity: The number of arguments that this operator consumes.
precedence: How tightly this operator binds its arguments (the higher
the number, the more tightly it binds). Operators with higher
precedence will be evaluated first.
associativity: One of 'left', 'right', or 'none'; indicating how
operators of the same precedence should be evaluated in the absence
of explicit grouping parentheses. If left associative, groups are
formed from the left [e.g. a % b % c -> ((a % b) % c)]; and
similarly for right.
fixity: One of 'prefix', 'infix', or 'postfix'; indicating how the
operator is positioned relative to its arguments. If 'prefix', the
operator comes before its arguments; if 'infix', the operator comes
between its arguments (and there must be exactly two of them); and
if 'postfix', the operator comes after its arguments.
to_terms: A callable that maps the arguments pass to the operator to
an iterable of `Term` instances.
accepts_context: A callable that will receive a list of Operator and
Token instances that describe the context in which the operator
would be applied if this callable returns `True`.
structural: Whether this operator adds structure to the terms sets, in
which case `Structured._merge` will not be used in the
`ASTNode.to_terms()`, and the termsets will be directly passed to
`Operator.to_terms()`.
"""
class Associativity(Enum):
LEFT = "left"
RIGHT = "right"
NONE = "none"
class Fixity(Enum):
PREFIX = "prefix"
INFIX = "infix"
POSTFIX = "postfix"
def __init__(
self,
symbol: str,
*,
arity: int,
precedence: Number,
associativity: Union[str, Associativity] = "none",
fixity: Union[str, Fixity] = "infix",
to_terms: Callable[..., Iterable[Term]] = None,
accepts_context: Callable[[List[Union[Token, Operator]]], bool] = None,
structural: bool = False,
):
self.symbol = symbol
self.arity = arity
self.precedence = precedence
self.associativity = associativity
self.fixity = fixity
self._to_terms = to_terms
self._accepts_context = accepts_context
self.structural = structural
@property
def associativity(self):
return self._associativity
@associativity.setter
def associativity(self, associativity):
self._associativity = Operator.Associativity(associativity or "none")
@property
def fixity(self):
return self._fixity
@fixity.setter
def fixity(self, fixity):
self._fixity = Operator.Fixity(fixity)
def to_terms(self, *args):
if self._to_terms is None:
raise RuntimeError(f"`to_terms` is not implemented for '{self.symbol}'.")
return self._to_terms(*args)
def accepts_context(self, context: List[Union[Token, Operator]]):
if self._accepts_context:
# We only need to pass on tokens and operators with precedence less
# than or equal to ourselves, since all other operators will be
# evaluated before us.
return self._accepts_context(
[
c
for c in context
if isinstance(c, Token) or c.precedence <= self.precedence
]
)
return True
def __repr__(self):
return self.symbol
from .ast_node import ASTNode
from .factor import Factor
from .formula_parser import FormulaParser
from .operator import Operator
from .operator_resolver import OperatorResolver
from .structured import Structured
from .term import Term
from .token import Token
__all__ = [
"ASTNode",
"Factor",
"FormulaParser",
"Operator",
"OperatorResolver",
"Structured",
"Term",
"Token",
]
from __future__ import annotations
import itertools
from collections import defaultdict
from typing import (
Any,
Callable,
Dict,
Generator,
Generic,
Iterable,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
ItemType = TypeVar("ItemType")
_MISSING = object()
class Structured(Generic[ItemType]):
"""
Layers structure onto an arbitrary type.
Structure can be added in two ways: by keys and by tuples, and can be
arbitrarily nested. If present, the object assigned to the "root" key is
treated specially, in that enumeration over the structured instance is
equivalent to enumeration over the root node if there is no other structure.
Otherwise, enumeration and key look up is done over the top-level values in
the container in the order in which they were assigned (except that the root
node is always first).
The structure is mutable (new keys can be added, or existing attributes
overridden) by direct assignment in the usual way; or via the `_update`
method. To avoid collision with potential keys, all methods and attributes
are preceded with an underscore. Contrary to Python convention, these are
still considered public methods.
Attributes:
_structure: A dictionary of the keys stored in the `Structured`
instance.
_metadata: A dictionary of metadata which can be used to store arbitrary
information about the `Structured` instance.
Examples:
```
>>> s = Structured((1, 2), b=3, c=(4,5)); s
root:
[0]:
1
[1]:
2
.b:
3
.c:
[0]:
4
[1]:
5
>>> list(s)
[(1, 2), 3, (4, 5)]
>>> s.root
(1, 2)
>>> s.b
3
>>> s._map(lambda x: x+1)
root:
[0]:
2
[1]:
3
.b:
4
.c:
[0]:
5
[1]:
6
```
"""
__slots__ = ("_structure", "_metadata")
def __init__(
self,
root: Any = _MISSING,
*,
_metadata: Dict[str, Any] = None,
**structure,
):
if any(key.startswith("_") for key in structure):
raise ValueError(
"Substructure keys cannot start with an underscore. "
f"The invalid keys are: {set(key for key in structure if key.startswith('_'))}."
)
if root is not _MISSING:
structure["root"] = self.__prepare_item("root", root)
self._metadata = _metadata
self._structure = {
key: self.__prepare_item(key, item) for key, item in structure.items()
}
def __prepare_item(self, key: str, item: Any) -> ItemType:
if isinstance(item, Structured):
return item._map(
lambda x: self._prepare_item(key, x), as_type=self.__class__
)
if isinstance(item, tuple):
return tuple(self.__prepare_item(key, v) for v in item)
return self._prepare_item(key, item)
def _prepare_item(self, key: str, item: Any) -> ItemType:
return item
@property
def _has_root(self) -> bool:
"""
Whether this instance of `Structured` has a root node.
"""
return "root" in self._structure
@property
def _has_keys(self) -> bool:
"""
Whether this instance of `Structured` has any non-root named
substructures.
"""
return set(self._structure) != {"root"}
@property
def _has_structure(self) -> bool:
"""
Whether this instance of `Structured` has any non-trivial structure,
including named or unnamed substructures.
"""
return self._has_keys or self._has_root and isinstance(self.root, tuple)
def _map(
self,
func: Callable[[ItemType], Any],
recurse: bool = True,
as_type: Optional[Type[Structured]] = None,
) -> Structured[Any]:
"""
Map a callable object onto all the structured objects, returning a
`Structured` instance with identical structure, where the original
objects are replaced with the output of `func`.
Args:
func: The callable to apply to all objects contained in the
`Structured` instance.
recurse: Whether to recursively map, or only map one level deep (the
objects directly referenced by this `StructuredInstance`).
When `True`, if objects within this structure are `Structured`
instances also, then the map will be applied only on the leaf
nodes (otherwise `func` will received `Structured` instances).
(default: True).
as_type: An optional subclass of `Structured` to use for the mapped
values. If not provided, the base `Structured` type is used.
Returns:
A `Structured` instance with the same structure as this instance,
but with all objects transformed under `func`.
"""
def apply_func(obj):
if recurse and isinstance(obj, Structured):
return obj._map(func, recurse=True, as_type=as_type)
if isinstance(obj, tuple):
return tuple(apply_func(o) for o in obj)
return func(obj)
return (as_type or Structured)(
**{key: apply_func(obj) for key, obj in self._structure.items()}
)
def _flatten(self) -> Generator[ItemType]:
"""
Flatten any nested structure into a sequence of all values stored in
this `Structured` instance. The order is currently that yielded by a
depth-first iteration, however this is not guaranteed and should not
be relied upon.
"""
for value in self._structure.values():
if isinstance(value, Structured):
yield from value._flatten()
elif isinstance(value, tuple):
for v in value:
if isinstance(v, Structured):
yield from v._flatten()
else:
yield v
else:
yield value
def _to_dict(self, recurse: bool = True) -> Dict[Optional[str], Any]:
"""
Generate a dictionary representation of this structure.
Args:
recurse: Whether to recursively convert any nested `Structured`
instances into dictionaries also. If `False`, any nested
`Structured` instances will be surfaced in the generated
dictionary.
Returns:
The dictionary representation of this `Structured` instance.
"""
def do_recursion(obj):
if recurse and isinstance(obj, Structured):
return obj._to_dict()
if isinstance(obj, tuple):
return tuple(do_recursion(o) for o in obj)
return obj
return {key: do_recursion(value) for key, value in self._structure.items()}
def _simplify(
self, *, recurse: bool = True, unwrap: bool = True, inplace: bool = False
) -> Union[Any, Structured[ItemType]]:
"""
Simplify this `Structured` instance by:
- returning the object stored at the root node if there is no other
structure (removing as many `Structured` wrappers as satisfy
this requirement).
- if `recurse` is `True`, recursively applying the logic above to
any nested `Structured` instances.
Args:
unwrap: Whether to unwrap the root node (returning the raw
unstructured root value) if there is no other structure.
recurse: Whether to recurse the simplification into the objects
associated with the keys of this (and nested) `Structured`
instances.
inplace: Whether to simplify the current structure (`True`), or
return a new object with the simplifications (`False`). Note
that if `True`, `unwrap` *must* be `False`.
"""
if inplace and unwrap:
raise RuntimeError(
f"Cannot simplify `{self.__class__.__name__}` instances "
"in-place if `unwrap` is `True`."
)
structured = self
while (
isinstance(structured, Structured)
and structured._has_root
and not structured._has_structure
and (unwrap or isinstance(structured.root, Structured))
):
structured = structured.root
if not isinstance(structured, Structured):
return structured
structure = structured._structure
if recurse:
def simplify_obj(obj):
if isinstance(obj, Structured):
return obj._simplify(recurse=True)
if isinstance(obj, tuple):
return tuple(simplify_obj(o) for o in obj)
return obj
structure = {
key: simplify_obj(value) for key, value in structured._structure.items()
}
if inplace:
self._structure = structure
return self
return self.__class__(
_metadata=self._metadata,
**structure,
)
def _update(self, root=_MISSING, **structure) -> Structured[ItemType]:
"""
Return a new `Structured` instance that is identical to this one but
the root and/or keys replaced with the nominated values.
Args:
root: The (optional) replacement of the root node.
structure: Any additional key/values to update in the structure.
"""
if root is not _MISSING:
structure["root"] = root
return self.__class__(
**{
"_metadata": self._metadata,
**self._structure,
**{
key: self.__prepare_item(key, item)
for key, item in structure.items()
},
}
)
@classmethod
def _merge(
cls,
*objects: Any,
merger: Callable[..., ItemType] = None,
_context: Tuple[str, ...] = (),
) -> Union[ItemType, Structured[ItemType]]:
"""
Merge arbitrarily many objects into a single `Structured` instance.
If any of `objects` are `Structured` or `tuple` instances, then all
`objects` will be treated as `Structured` instances (being upcast as
necessary) and then merged recursively; otherwise the objects will be
merged directly by `merger`.
Note: An empty set of objects will result in an empty `Structured`
instance being returned.
Args:
objects: A tuple of Structured instances (will be upcast to a
trivial `Structured` instance as necessary).
merger: A callable which takes as arguments two or more items which
are to be merged. If not provided, a basic fallback is provided
that knows how to merge lists, dictionaries and sets.
_context: A string representing the context of the merge. Intended
for internal use.
"""
if merger is None:
merger = cls.__merger_default
# If objects are not specified, return an empty `Structured` instance.
if not objects:
return cls()
# Check for sequential (tuple) structures, and if so merge them and
# return them wrapped in a `Structured` instance.
all_tuples = all(isinstance(obj, tuple) for obj in objects)
any_tuples = any(isinstance(obj, tuple) for obj in objects)
if any_tuples and not all_tuples:
raise ValueError(
f"Substructures for `.{'.'.join(_context)}` are not aligned and cannot be merged."
)
if all_tuples:
merged = tuple(itertools.chain(*objects))
if _context:
# We are merging substructure of `Structured` instances (and don't need the class wrapper)
return merged
return cls(merged)
# Check whether all objects are not Structured instances (or tuples,
# already excluded by above). If so, just call `merger` on them
# directly.
if all(not isinstance(obj, Structured) for obj in objects):
return merger(*objects)
# Otherwise,iterate over objects, upcasting to `Structured` as necessary
# and recursively merge them by merging their structure dictionaries.
values_to_merge = defaultdict(list)
for obj in objects:
if isinstance(obj, Structured):
for key, value in obj._structure.items():
values_to_merge[key].append(value)
else:
values_to_merge["root"].append(obj)
return cls(
**{
key: (
cls._merge(*values, merger=merger, _context=_context + (key,))
if len(values) > 1
else values[0]
)
for key, values in values_to_merge.items()
}
)
@staticmethod
def __merger_default(*items):
if all(isinstance(item, list) for item in items):
return list(itertools.chain(*items))
if all(isinstance(item, set) for item in items):
return set.union(*items)
if all(isinstance(item, dict) for item in items):
return dict(itertools.chain(*(d.items() for d in items)))
raise NotImplementedError(
"The fallback `merger` for `Structured._merge` does not know how to "
f"merge objects of types {repr(tuple(type(item) for item in items))}. "
"Please specify `merger` explicitly."
)
def __dir__(self):
return super().__dir__() + list(self._structure)
def __getattr__(self, attr):
if attr.startswith("_"):
raise AttributeError(attr)
if attr in self._structure:
return self._structure[attr]
raise AttributeError(
f"This `{self.__class__.__name__}` instance does not have structure @ `{repr(attr)}`."
)
def __setattr__(self, attr, value):
if attr.startswith("_"):
super().__setattr__(attr, value)
return
self._structure[attr] = self.__prepare_item(attr, value)
def __getitem__(self, key):
if self._has_root and not self._has_keys:
return self.root[key]
if key in (None, "root") and self._has_root:
return self.root
if isinstance(key, str) and not key.startswith("_") and key in self._structure:
return self._structure[key]
raise KeyError(
f"This `{self.__class__.__name__}` instance does not have structure @ `{repr(key)}`."
)
def __setitem__(self, key, value):
if not isinstance(key, str) or not key.isidentifier():
raise KeyError(key)
if key.startswith("_"):
raise KeyError(
"Substructure keys cannot start with an underscore. "
f"The invalid keys are: {set(key for key in self._structure if key.startswith('_'))}."
)
self._structure[key] = self.__prepare_item(key, value)
def __iter__(self) -> Generator[Union[ItemType, Structured[ItemType]]]:
if self._has_root and not self._has_keys and isinstance(self.root, Iterable):
yield from self.root
else:
if self._has_root: # Always yield root first.
yield self.root
for key, value in self._structure.items():
if key != "root":
yield value
def __eq__(self, other):
if isinstance(other, Structured):
return self._structure == other._structure
return False
def __contains__(self, key):
return key in self._structure
def __len__(self) -> int:
return sum(1 for _ in self)
def __str__(self):
return self.__repr__(to_str=str)
def __repr__(self, to_str=repr):
import textwrap
d = self._to_dict(recurse=False)
keys = [key for key in d if key != "root"]
if self._has_root:
keys.insert(0, "root")
out = []
for key in keys:
if key == "root":
out.append("root:")
else:
out.append(f".{key}:")
value = d[key]
if isinstance(value, tuple):
for i, obj in enumerate(value):
out.append(f" [{i}]:")
out.append(textwrap.indent(to_str(obj), " "))
else:
out.append(textwrap.indent(to_str(value), " "))
return "\n".join(out)
from __future__ import annotations
from enum import Enum
from typing import Dict, Iterable, Optional, Union, TYPE_CHECKING
from .term import Term
if TYPE_CHECKING:
from .token import Token # pragma: no cover
class Factor:
"""
Factors are the indivisable atomic unit that make up formulas.
Each instance of `Factor` is a specification that is evaluable by a
materializer to generate concrete vector(s). `Factors` are multiplied
together into `Term`s, which in turn represent the output columns of model
matrices. Note that `Factor` instances are entirely abstract of data.
Attributes:
expr: The (string) expression to be evaluated by the materializer.
eval_method: An `EvalMethod` enum instance indicating the mechanism to
be used to evaluate the expression (one of: unknown, literal, lookup
or python).
kind: The kind of data represented (one of: unknown, constant,
numerical, categorical).
metadata: An additional (optional) dictionary of metadata (currently
unused).
token: The `Token` instance from which the the `Formula` object was
created.
"""
class EvalMethod(Enum):
LITERAL = "literal"
LOOKUP = "lookup"
PYTHON = "python"
class Kind(Enum):
UNKNOWN = "unknown"
CONSTANT = "constant"
NUMERICAL = "numerical"
CATEGORICAL = "categorical"
__slots__ = ("expr", "_eval_method", "_kind", "metadata", "token")
def __init__(
self,
expr: str = "",
*,
eval_method: Optional[Union[str, EvalMethod]] = None,
kind: Optional[Union[str, Kind]] = None,
metadata: Optional[Dict] = None,
token: Optional[Token] = None,
):
self.expr = expr
self.eval_method = eval_method
self.kind = kind
self.metadata = metadata or {}
self.token = token
@property
def eval_method(self) -> EvalMethod:
return self._eval_method
@eval_method.setter
def eval_method(self, eval_method):
self._eval_method = Factor.EvalMethod(eval_method or "lookup")
@property
def kind(self) -> Kind:
return self._kind
@kind.setter
def kind(self, kind):
self._kind = Factor.Kind(kind or "unknown")
def __eq__(self, other):
if isinstance(other, str):
return self.expr == other
if isinstance(other, Factor):
return self.expr == other.expr
return NotImplemented
def __hash__(self):
return self.expr.__hash__()
def __lt__(self, other):
if isinstance(other, Factor):
return self.expr < other.expr
return NotImplemented
def to_terms(self) -> Iterable[Term]:
"""
Convert this `Factor` instance into a `Term` instance, and expose it as
a single-element iterable.
"""
return {Term([self])}
def __repr__(self):
return self.expr
from __future__ import annotations
import graphlib
from typing import Any, Dict, Iterable, List
from .operator import Operator
from .structured import Structured
from .term import Term
class ASTNode:
"""
Represents a node in an Abstract Syntax Tree (AST).
An `ASTNode` instance is composed of an `Operator` instance and a set of
arguments to be passed into that operator. The arguments may include nested
`ASTNode`s or other arguments. Once evaluated, a set of `Term` instances
is returned.
Attributes:
operator: The `Operator` instance associated with this node.
args: The arguments associated with this node.
"""
def __init__(self, operator: Operator, args: Iterable[Any]):
self.operator = operator
self.args = args
def to_terms(self) -> Iterable[Term]:
"""
Evaluate this AST node and return the resulting set of `Term` instances.
Note: We use topological evaluation here to avoid recursion issues for
long formula (exceeding ~700 terms, though this depends on the recursion
limit set in the interpreter).
"""
g = graphlib.TopologicalSorter(self.__generate_evaluation_graph())
g.prepare()
results = {}
while g.is_active():
for node in g.get_ready():
node_args = (
(results[arg] if isinstance(arg, ASTNode) else arg.to_terms())
for arg in node.args
)
if node.operator.structural:
results[node] = node.operator.to_terms(*node_args)
else:
results[node] = Structured._merge(
*node_args,
merger=node.operator.to_terms,
)
g.done(node)
return results[self]
def __repr__(self):
try:
return f"<ASTNode {self.operator}: {self.args}>"
except RecursionError:
return f"<ASTNode {self.operator}: ...>"
def flatten(self, str_args: bool = False) -> List[Any]:
"""
Flatten this `ASTNode` instance into a list of form: [<operator>, *<args>].
This is primarily useful during debugging and unit testing, since it
provides a human readable summary of the entire AST.
Args:
str_args: Whether to cast every element of the flattened object to
a string.
"""
return [
str(self.operator) if str_args else self.operator,
*[
arg.flatten(str_args=str_args)
if isinstance(arg, ASTNode)
else (str(arg) if str_args else arg)
for arg in self.args
],
]
# Helpers
def __generate_evaluation_graph(self) -> Dict[ASTNode, List[ASTNode]]:
nodes_to_parse = [self]
graph = {}
while nodes_to_parse:
node = nodes_to_parse.pop()
children = [child for child in node.args if isinstance(child, ASTNode)]
nodes_to_parse.extend(children)
graph[node] = children
return graph
from .parser import DefaultFormulaParser, DefaultOperatorResolver
__all__ = [
"DefaultFormulaParser",
"DefaultOperatorResolver",
]
from .tokenize import tokenize
from .tokens_to_ast import tokens_to_ast
__all__ = [
"tokenize",
"tokens_to_ast",
]
from collections import namedtuple
from typing import Iterable, Optional
from ..types import ASTNode, Operator, OperatorResolver, Token
from ..utils import exc_for_token, exc_for_missing_operator
OrderedOperator = namedtuple("OrderedOperator", ("operator", "token", "index"))
CONTEXT_OPENERS = {"(", "["}
CONTEXT_CLOSERS = {
")": "(",
"]": "[",
}
def tokens_to_ast(
tokens: Iterable[Token], operator_resolver: OperatorResolver
) -> Optional[ASTNode]:
"""
Convert a iterable of `Token` instances into an abstract syntax tree.
This implementation is intentionally as simple and abstract as possible, and
makes few assumptions about the form of the operators that will be present
in the token sequence. Instead, it relies on the `OperatorResolver` instance
to evaluate based on the context which operator should be invoked to handle
surrounding tokens based on their arity/etc. This means that changes to the
formula syntax (such as the addition of new operators) should not require
any changes to this abstract syntax tree generator.
The algorithm employed here is a slightly enriched [Shunting Yard
Algorithm](https://en.wikipedia.org/wiki/Shunting-yard_algorithm), where we
have added additional support for operator arities, fixities,
associativities, etc.
Args:
tokens: The tokens for which an abstract syntax tree should be
generated.
operator_resolver: The `OperatorResolver` instance to be used to lookup
operators (only the `.resolve()` method is used).
Returns:
The generated abstract syntax tree as a nested `ASTNode` instance.
"""
output_queue = []
operator_stack = []
def stack_operator(operator, token):
operator_stack.append(OrderedOperator(operator, token, len(output_queue)))
def operate(ordered_operator, output_queue):
operator, token, index = ordered_operator
if operator.fixity is Operator.Fixity.INFIX:
assert operator.arity == 2
min_index = index - 1
max_index = index + 1
elif operator.fixity is Operator.Fixity.PREFIX:
min_index = index
max_index = index + operator.arity
else: # Operator.Fixity.POSTFIX
min_index = index - operator.arity
max_index = index
if min_index < 0 or max_index > len(output_queue):
raise exc_for_token(
token,
f"Operator `{token.token}` has insuffient arguments and/or is misplaced.",
)
return [
*output_queue[:min_index],
ASTNode(operator, output_queue[min_index:max_index]),
*output_queue[max_index:],
]
for token in tokens:
if token.kind is token.Kind.CONTEXT:
if token.token in CONTEXT_OPENERS:
stack_operator(token, token)
elif token.token in CONTEXT_CLOSERS:
starting_token = CONTEXT_CLOSERS[token.token]
while operator_stack and operator_stack[-1].token != starting_token:
output_queue = operate(operator_stack.pop(), output_queue)
if operator_stack and operator_stack[-1].token == starting_token:
operator_stack.pop()
else:
raise exc_for_token(
token, "Could not find matching context marker."
)
else: # pragma: no cover
raise exc_for_token(
token,
f"Context token `{token.token}` is unrecognized.",
)
elif token.kind is token.Kind.OPERATOR:
max_prefix_arity = (
len(output_queue) - operator_stack[-1].index
if operator_stack
else len(output_queue)
)
operators = operator_resolver.resolve(
token,
max_prefix_arity=max_prefix_arity,
context=[s.operator for s in operator_stack],
)
for operator in operators:
while (
operator_stack
and operator_stack[-1].token.kind is not Token.Kind.CONTEXT
and (
operator_stack[-1].operator.precedence > operator.precedence
or operator_stack[-1].operator.precedence == operator.precedence
and operator.associativity is Operator.Associativity.LEFT
)
):
output_queue = operate(operator_stack.pop(), output_queue)
stack_operator(operator, token)
else:
output_queue.append(token)
while operator_stack:
if operator_stack[-1].token.kind is Token.Kind.CONTEXT:
raise exc_for_token(
operator_stack[-1].token, "Could not find matching context marker."
)
output_queue = operate(operator_stack.pop(), output_queue)
if output_queue:
if len(output_queue) > 1:
raise exc_for_missing_operator(output_queue[0], output_queue[1])
return output_queue[0]
import re
from typing import Iterable, Pattern
from ..types import Token
from ..utils import exc_for_token
def tokenize(
formula: str,
word_chars: Pattern = re.compile(r"[\.\_\w]"),
numeric_chars: Pattern = re.compile(r"[0-9\.]"),
whitespace_chars: Pattern = re.compile(r"\s"),
) -> Iterable[Token]:
"""
Convert a formula string into a generator of tokens.
This tokenizer is intentionally very simple, and it makes no attempt to
validate incoming tokens beyond ensuring that they are complete. The
rationale for this is that changes like adding support for a new operator do
not require changes to this tokenizer, and can instead be done entirely
within the higher-level parser. This simplicity also lends itself to a direct
functional implementation (rather than a class with methods), and so that is
approach taken here.
Tokens outputted will have one of four kinds:
- operator: an operator to be applied to other surrounding tokens (will
always consist of non-word characters).
- name: a name of a feature/variable to be lifted from the model matrix
context.
- value: a literal value (string/number).
- python: a code string to be evaluated.
The basic logic of this tokenizer is to loop over each character in the
formula string and:
- ensure that portions quoted by one of : ', ", {}, and ` are correctly
grouped into a token of the appropriate kind.
- ignore unquoted whitespace
- correctly distinguish users of (, ), [, and ] as grouping operators vs. Python
function calls.
- output each contiguous portion of the formula string that belongs to
the same token type as a token. (e.g. sequential operators like '+-'
will be output as a single operator token).
Args:
formula: The formula string to tokenize.
word_chars: The regex pattern used to recognize "word" characters
(basically non-operator characters).
numeric_chars: The regex pattern used to recognize numeric characters.
whitespace_chars: The regex pattern use to recognize (ignored)
whitespace characters.
Returns:
A generator over the tokens found in the formula string.
"""
quote_context = []
take = 0
token = Token(source=formula)
for i, char in enumerate(formula):
if take > 0:
token.update(char, i)
take -= 1
continue
if quote_context and char == "\\":
token.update(char, i)
take = 1
continue
if quote_context and quote_context[-1] in "}`" and char == quote_context[-1]:
quote_context.pop(-1)
if token:
if quote_context:
token.update(char, i)
else:
yield token
token = Token(source=formula)
continue
if quote_context and char == quote_context[-1]:
token.update(char, i)
quote_context.pop(-1)
if (
token
and not quote_context
and token.kind is Token.Kind.PYTHON
and char in ("]", ")")
):
yield token
token = Token(source=formula)
continue
if quote_context and quote_context[-1] in ('"', "'", "`", ")", "}"):
if char in "(`" and quote_context[-1] in "})":
quote_context.append(char.replace("(", ")"))
token.update(char, i)
continue
if char == "{":
if token:
yield token
token = Token(source=formula, kind="python", source_start=i)
quote_context.append("}")
continue
if char == "`":
if token:
yield token
token = Token(source=formula, kind="name", source_start=i)
quote_context.append("`")
continue
if char in "([":
if token.kind in (Token.Kind.NAME, Token.Kind.PYTHON):
token.update(char, i, kind=Token.Kind.PYTHON)
quote_context.append(")" if char == "(" else "]")
else:
if token:
yield token
token = Token(source=formula)
yield Token(source=formula).update(char, i, kind="context")
continue
if char in ")]":
if token:
yield token
token = Token(source=formula)
yield Token(source=formula).update(char, i, kind="context")
continue
if whitespace_chars.match(char):
if token and token.kind is not Token.Kind.OPERATOR:
yield token
token = Token(source=formula)
continue
if char in ('"', "'"):
if token and token.kind is Token.Kind.OPERATOR:
yield token
token = Token(source=formula)
if not token:
token.update(char, i, kind="value")
quote_context.append(char)
else:
raise exc_for_token(
Token(source=formula, source_start=i, source_end=i),
f"Unexpected character {repr(char)} following token `{token.token}`.",
)
continue # pragma: no cover; workaround bug in coverage
if word_chars.match(char):
assert token.kind in (
None,
Token.Kind.OPERATOR,
Token.Kind.VALUE,
Token.Kind.NAME,
), f"Unexpected token kind {token.kind}."
if token and token.kind is Token.Kind.OPERATOR:
yield token
token = Token(source=formula)
if numeric_chars.match(char) and token.kind in (None, Token.Kind.VALUE):
kind = "value"
else:
kind = "name"
token.update(char, i, kind=kind)
continue
if token and token.kind is not Token.Kind.OPERATOR:
yield token
token = Token(source=formula)
token.update(char, i, kind="operator")
if quote_context:
raise exc_for_token(
token,
message=f"Formula ended before quote context was closed. Expected: {quote_context[-1]}",
)
if token:
yield token
import ast
import itertools
import functools
import re
from dataclasses import dataclass, field
from typing import List, Iterable, Set, Tuple, Union
from .algos.tokenize import tokenize
from .types import (
FormulaParser,
Operator,
OperatorResolver,
Structured,
Term,
Token,
)
from .utils import (
exc_for_token,
insert_tokens_after,
merge_operator_tokens,
replace_tokens,
)
@dataclass
class DefaultFormulaParser(FormulaParser):
"""
The default parser for `Formula`s.
It extends `FormulaParser` by defaulting the operator resolver to
`DefaultOperatorResolver`, and by adding the option to enable the inclusion
of an intercept.
Attributes:
operator_resolver: The operator resolver to use when parsing the formula
string and generating the abstract syntax tree. If not specified,
it will default to `DefaultOperatorResolver`.
include_intercept: Whether to include an intercept by default
(formulas can still omit this intercept in the usual manner:
adding a '-1' or '+0' term).
"""
ZERO_PATTERN = re.compile(r"(?:^|(?<=\W))0(?=\W|$)")
# Attributes
operator_resolver: OperatorResolver = field(
default_factory=lambda: DefaultOperatorResolver() # pylint: disable=unnecessary-lambda
)
include_intercept: bool = True
def get_tokens(self, formula: str) -> Iterable[Token]:
"""
Return an iterable of `Token` instances for the nominated `formula`
string.
Args:
formula: The formula string to be tokenized.
"""
# Transform formula to add intercepts and replace 0 with -1. We do this
# as token transformations to reduce the complexity of the code, and
# also to avoid the ambiguity in the AST around intentionally unary vs.
# incidentally unary operations (e.g. "+0" vs. "x + (+0)"). This cannot
# easily be done as string operations because of quotations and escapes
# which are best left to the tokenizer.
token_one = Token("1", kind=Token.Kind.VALUE)
token_plus = Token("+", kind=Token.Kind.OPERATOR)
token_minus = Token("-", kind=Token.Kind.OPERATOR)
tokens = tokenize(formula)
# Substitute "0" with "-1"
tokens = replace_tokens(
tokens, "0", [token_minus, token_one], kind=Token.Kind.VALUE
)
# Insert intercepts
if self.include_intercept:
tokens = list(
insert_tokens_after(
tokens,
"~",
[token_one],
kind=Token.Kind.OPERATOR,
join_operator="+",
)
)
rhs_index = (
max(
(i for i, token in enumerate(tokens) if token.token.endswith("~")),
default=-1,
)
+ 1
)
tokens = [
*(
tokens[:rhs_index]
if rhs_index > 0
else ([token_one, token_plus] if len(tokens) > 0 else [token_one])
),
*insert_tokens_after(
tokens[rhs_index:],
r"\|",
[token_one],
kind=Token.Kind.OPERATOR,
join_operator="+",
),
]
# Collapse inserted "+" and "-" operators to prevent unary issues.
tokens = merge_operator_tokens(tokens, symbols={"+", "-"})
return tokens
class DefaultOperatorResolver(OperatorResolver):
"""
The default operator resolver implementation.
This class implements the standard operators in a form consistent with
other implementations of Wilkinson formulas. It can be extended via
subclassing to support other kinds of operators, in which case `.operators`
and/or `.resolve` can be overridden. For more details about which operators
are implemented, review the code or the documentation website.
"""
@property
def operators(self):
def formula_part_expansion(
lhs: Set[Term], rhs: Set[Term]
) -> Tuple[Set[Term], Set[Term]]:
terms = (lhs, rhs)
out = []
for termset in terms:
if isinstance(termset, tuple):
out.extend(termset)
else:
out.append(termset)
return tuple(out)
def nested_product_expansion(
parents: Set[Term], nested: Set[Term]
) -> Set[Term]:
common = functools.reduce(lambda x, y: x * y, parents)
return parents.union({common * term for term in nested})
def power(arg: Set[Term], power: Set[Term]) -> Set[Term]:
power_term = next(iter(power))
if (
not len(power_term.factors) == 1
or power_term.factors[0].token.kind is not Token.Kind.VALUE
or not isinstance(ast.literal_eval(power_term.factors[0].expr), int)
):
raise exc_for_token(
power_term.factors[0].token,
"The right-hand argument of `**` must be a positive integer.",
)
return {
functools.reduce(lambda x, y: x * y, term)
for term in itertools.product(*[arg] * int(power_term.factors[0].expr))
}
return [
Operator(
"~",
arity=2,
precedence=-100,
associativity=None,
to_terms=lambda lhs, rhs: Structured(lhs=lhs, rhs=rhs),
accepts_context=lambda context: len(context) == 0,
structural=True,
),
Operator(
"~",
arity=1,
precedence=-100,
associativity=None,
fixity="prefix",
to_terms=lambda terms: terms,
accepts_context=lambda context: len(context) == 0,
structural=True,
),
Operator(
"|",
arity=2,
precedence=-50,
associativity=None,
to_terms=formula_part_expansion,
accepts_context=lambda context: all(
isinstance(c, Operator) and c.symbol in "~|" for c in context
),
structural=True,
),
Operator(
"+",
arity=2,
precedence=100,
associativity="left",
to_terms=lambda lhs, rhs: lhs.union(rhs),
),
Operator(
"-",
arity=2,
precedence=100,
associativity="left",
to_terms=lambda left, right: left.difference(right),
),
Operator(
"+",
arity=1,
precedence=100,
associativity="right",
fixity="prefix",
to_terms=lambda terms: terms,
),
Operator(
"-",
arity=1,
precedence=100,
associativity="right",
fixity="prefix",
to_terms=lambda terms: set(),
),
Operator(
"*",
arity=2,
precedence=200,
associativity="left",
to_terms=lambda *term_sets: (
{
functools.reduce(lambda x, y: x * y, term)
for term in itertools.product(*term_sets)
}.union(itertools.chain(*term_sets))
),
),
Operator(
"/",
arity=2,
precedence=200,
associativity="left",
to_terms=nested_product_expansion,
),
Operator(
":",
arity=2,
precedence=300,
associativity="left",
to_terms=lambda *term_sets: {
functools.reduce(lambda x, y: x * y, term)
for term in itertools.product(*term_sets)
},
),
Operator(
"**", arity=2, precedence=500, associativity="right", to_terms=power
),
]
def resolve(
self, token: Token, max_prefix_arity: int, context: List[Union[Token, Operator]]
) -> Iterable[Operator]:
if token.token in self.operator_table:
return super().resolve(token, max_prefix_arity, context)
symbol = token.token
# Keep track the number of "+" and "-" characters; if an odd number "-"
# than "-", else "+"
while True:
m = re.search(r"[+\-]{2,}", symbol)
if not m:
break
symbol = (
symbol[: m.start(0)] + "-"
if len(m.group(0).replace("+", "")) % 2
else "+" + symbol[m.end(0) :]
)
if symbol in self.operator_table:
return [self._resolve(token, symbol, max_prefix_arity, context)]
return [
self._resolve(token, sym, max_prefix_arity if i == 0 else 0, context)
for i, sym in enumerate(symbol)
]
import re
from typing import Iterable, Optional, Sequence, Set, Tuple, Type, Union
from formulaic.errors import FormulaSyntaxError
from .types.ast_node import ASTNode
from .types.token import Token
# Exception handling
def exc_for_token(
token: Union[Token, ASTNode],
message: str,
errcls: Type[Exception] = FormulaSyntaxError,
) -> Exception:
"""
Return an exception ready to be raised with a helpful token/source context.
Args:
token: The `Token` or `ASTNode` instance about which an exception should
be raised.
message: The message to be included in the exception.
errcls: The type of the exception to be returned.
"""
token = __get_token_for_ast(token)
token_context = token.get_source_context(colorize=True)
if token_context:
return errcls(f"{message}\n\n{token_context}")
return errcls(message)
def exc_for_missing_operator(
lhs: Union[Token, ASTNode],
rhs: Union[Token, ASTNode],
errcls: Type[Exception] = FormulaSyntaxError,
) -> Exception:
"""
Return an exception ready to be raised about a missing operator token
between the `lhs` and `rhs` tokens/ast-nodes.
Args:
lhs: The `Token` or `ASTNode` instance to the left of where an operator
should be placed.
rhs: The `Token` or `ASTNode` instance to the right of where an operator
should be placed.
errcls: The type of the exception to be returned.
"""
lhs_token, rhs_token, error_token = __get_tokens_for_gap(lhs, rhs)
return exc_for_token(
error_token,
f"Missing operator between `{lhs_token.token}` and `{rhs_token.token}`.",
errcls=errcls,
)
def __get_token_for_ast(ast: Union[Token, ASTNode]) -> Token: # pragma: no cover
"""
Ensure that incoming `ast` is a `Token`, or else generate one for debugging
purposes (note that this token will not be valid `Token` for use other than
in reporting errors).
"""
if isinstance(ast, Token):
return ast
lhs_token = ast
while isinstance(lhs_token, ASTNode):
lhs_token = lhs_token.args[0]
rhs_token = ast
while isinstance(rhs_token, ASTNode):
rhs_token = rhs_token.args[-1]
return Token(
token=lhs_token.source[lhs_token.source_start : rhs_token.source_end + 1]
if lhs_token.source
else "",
source=lhs_token.source,
source_start=lhs_token.source_start,
source_end=rhs_token.source_end,
)
def __get_tokens_for_gap(
lhs: Union[Token, ASTNode], rhs: Union[Token, ASTNode]
) -> Tuple[Token, Token, Token]:
"""
Ensure that incoming `lhs` and `rhs` objects are `Token`s, or else generate
some for debugging purposes (note that these tokens will not be valid
`Token`s for use other than in reporting errors). Three tokens will be
returned: the left-hand side token, the right-hand-side token, and the
"middle" token where a new operator/token should be inserted (may not
be empty depending on context).
"""
lhs_token = lhs
while isinstance(lhs_token, ASTNode):
lhs_token = lhs_token.args[-1]
rhs_token = rhs or lhs
while isinstance(rhs_token, ASTNode):
rhs_token = rhs_token.args[0]
return (
lhs_token,
rhs_token,
Token(
lhs_token.source[lhs_token.source_start : rhs_token.source_end + 1]
if lhs_token.source
else "",
source=lhs_token.source,
source_start=lhs_token.source_start,
source_end=rhs_token.source_end,
),
)
# Token sequence mutations
def replace_tokens(
tokens: Iterable[Token],
token_to_replace: str,
replacement: Union[Token, Sequence[Token]],
*,
kind: Optional[Token.Kind] = None,
) -> Iterable[Token]:
"""
Replace any token in the `tokens` sequence with one or more replacement
tokens.
Args:
tokens: The sequence of tokens within which tokens should be replaced.
token_to_replace: The string representation of the token to replace.
replacement: The replacement token(s) to insert into the `tokens`
sequence.
kind: The type of tokens to be replaced. If not specified, all
tokens which match the provided `token_to_match` string will be
replaced.
"""
for token in tokens:
if kind and token.kind is not kind or token.token != token_to_replace:
yield token
else:
if isinstance(replacement, Token):
yield replacement
else:
yield from replacement
def insert_tokens_after(
tokens: Iterable[Token],
pattern: Union[str, re.Pattern],
tokens_to_add: Sequence[Token],
*,
kind: Optional[Token.Kind] = None,
join_operator: Optional[str] = None,
) -> Iterable[Token]:
"""
Insert additional tokens into a sequence of tokens after (within token)
pattern matches.
Note: this insertion can happen in the *middle* of existing tokens, which is
especially useful when inserting tokens around multiple operators (which are
often merged together into a single token). If you want to avoid this, make
sure your regex `pattern` includes start and end matchers; e.g.
`^<pattern>$`.
Args:
tokens: The sequence of tokens within which tokens should be replaced.
pattern: A (potentially compiled) regex expression indicating where
tokens should be inserted.
tokens_to_add: A sequence of tokens to be inserted wherever `pattern`
matches.
kind: The type of tokens to be considered for insertion. If not
specified, any matching token (part) will result in insertions.
join_operator: If the insertion of tokens would result the joining of
the added tokens with existing tokens, the value set here will be
used to create a joining operator token. If not provided, not
additional operators are added.
"""
if not isinstance(pattern, re.Pattern):
pattern = re.compile(pattern)
if join_operator:
tokens = list(tokens)
for i, token in enumerate(tokens):
if (
kind is not None
and token.kind is not kind
or not pattern.search(token.token)
):
yield token
continue
split_tokens = list(token.split(pattern, after=True))
for j, split_token in enumerate(split_tokens):
yield split_token
m = pattern.search(split_token.token)
if m and m.span()[1] == len(split_token.token):
yield from tokens_to_add
if join_operator:
next_token = None
if j < len(split_tokens) - 1:
next_token = split_tokens[j + 1]
elif i < len(tokens) - 1:
next_token = tokens[i + 1]
if (
next_token is not None
and next_token.kind is not Token.Kind.OPERATOR
):
yield Token(join_operator, kind=Token.Kind.OPERATOR)
def merge_operator_tokens(
tokens: Iterable[Token], symbols: Optional[Set[str]] = None
) -> Iterable[Token]:
"""
Merge operator tokens within a sequence of tokens.
This is useful if you have added operator tokens after tokenization, in
order to allow operator resolution of (e.g.) adjacent `+` and `-` operators.
Args:
tokens: The sequence of tokens within which tokens should be replaced.
symbols: If specified, only adjacent operator symbols appearing within
this set will be merged.
"""
pooled_token = None
for token in tokens:
if (
token.kind is not Token.Kind.OPERATOR
or symbols
and token.token[0] not in symbols
):
if pooled_token:
yield pooled_token
pooled_token = None
yield token
continue
# `token` is an operator that can be collapsed on the left
if pooled_token:
pooled_token = token.copy_with_attrs(token=pooled_token.token + token.token)
if symbols and not pooled_token.token[-1] in symbols:
yield pooled_token
pooled_token = None
continue
pooled_token = token
if pooled_token:
yield pooled_token
from typing import Any, Mapping, Union
from .formula import FormulaSpec
from .model_matrix import ModelMatrices, ModelMatrix
from .model_spec import ModelSpec, ModelSpecs
from .utils.context import capture_context
def model_matrix(
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs],
data: Any,
*,
context: Union[int, Mapping[str, Any]] = 0,
**spec_overrides,
) -> Union[ModelMatrix, ModelMatrices]:
"""
Generate a model matrix directly from a formula or model spec.
This method is syntactic sugar for:
```
Formula(spec).get_model_matrix(data, context=LayeredMapping(locals(), globals()), **kwargs)
```
or
```
model_spec.get_model_matrix(data, context=LayeredMapping(locals(), globals()), **kwargs)
```
Args:
spec: The spec that describes the structure of the model matrix to be
generated. This can be either a `ModelMatrix` or `ModelSpec`
instance (in which case the structure and state associated with the
`ModelSpec` instance is re-used), or a formula specification or
instance (in which case the structure is built from scratch).
data: The raw data to be transformed into a model matrix. This can be
any of the supported data types, but is typically a
`pandas.DataFrame` instance.
context: The context from which variables (and custom transforms/etc)
should be inherited. When specified as an integer, it is interpreted
as a frame offset from the caller's frame (i.e. 0, the default,
means that all variables in the caller's scope should be made
accessible when interpreting and evaluating formulae). Otherwise, a
mapping from variable name to value is expected.
spec_overrides: Any `ModelSpec` attributes to set/override. See
`ModelSpec` for more details.
Returns:
The data transformed in to the model matrix with the requested
nominated structure.
"""
if isinstance(context, int):
context = capture_context(context + 1)
return ModelSpec.from_spec(spec, **spec_overrides).get_model_matrix(
data, context=context
)
from __future__ import annotations
from abc import abstractmethod
import inspect
import warnings
from numbers import Number
from typing import Any, Union, Dict, Iterable, List, Optional, TYPE_CHECKING
import numpy
import pandas
import scipy.sparse as spsparse
import scipy.sparse.linalg
from interface_meta import InterfaceMeta
from formulaic.errors import DataMismatchWarning
from formulaic.materializers.types import FactorValues
from formulaic.utils.sparse import categorical_encode_series_to_sparse_csc_matrix
from formulaic.utils.stateful_transforms import stateful_transform
from .poly import poly
if TYPE_CHECKING:
from formulaic.model_spec import ModelSpec # pragma: no cover
def C(
data: Any,
contrasts: Optional[
Union[Contrasts, Dict[str, Iterable[Number]], numpy.ndarray]
] = None,
*,
levels: Optional[Iterable[str]] = None,
):
"""
Mark data as being categorical, and optionally specify the contrasts to be
used during encoding.
Args:
data: The data to be marked as categorical.
contrasts: The specification of the contrasts that are to be computed.
Should be a `Contrasts` instance, a dictionary mapping a key for
the contrast with a vector of weights for the categories, or a
numpy array with columns representing the contrasts, and rows
representing the weights over the categories in the data. If not
specified, a `Treatment` encoding is assumed.
levels: The categorical levels associated with `data`. If not present,
levels are inferred from `data`. Note that extra levels in `data`
will be treated as null data.
"""
def encoder(
values: Any,
reduced_rank: bool,
drop_rows: List[int],
encoder_state: Dict[str, Any],
model_spec: ModelSpec,
):
values = pandas.Series(values)
values = values.drop(index=values.index[drop_rows])
return encode_contrasts(
values,
contrasts=contrasts,
levels=levels,
reduced_rank=reduced_rank,
_state=encoder_state,
_spec=model_spec,
)
return FactorValues(
data,
kind="categorical",
spans_intercept=True,
encoder=encoder,
)
@stateful_transform
def encode_contrasts(
data,
contrasts: Union[
Contrasts, Dict[str, Iterable[Number]], numpy.ndarray, None
] = None,
*,
levels: Optional[Iterable[str]] = None,
reduced_rank: bool = False,
output: Optional[str] = None,
_state=None,
_spec=None,
) -> FactorValues[Union[pandas.DataFrame, spsparse.spmatrix]]:
"""
Encode a categorical dataset into one or more "contrasts".
Args:
data: The categorical data array/series to be encoded.
contrasts: The specification of the contrasts that are to be computed.
Should be a `Contrasts` instance, a dictionary mapping a key for
the contrast with a vector of weights for the categories, or a
numpy array with columns representing the contrasts, and rows
representing the weights over the categories in the data. If not
specified, a `Treatment` encoding is assumed.
levels: The complete set of levels (categories) posited to be present in
the data. This can also be used to reorder the levels as needed.
reduced_rank: Whether to reduce the rank of output encoded columns in
order to avoid spanning the intercept.
output: The type of data to output. Must be one of "pandas", "numpy", or
"sparse".
"""
# Prepare arguments
output = output or _spec.output or "pandas"
levels = levels or _state.get(
"categories"
) # TODO: Is this too early to provide useful feedback to users?
if contrasts is None:
contrasts = TreatmentContrasts()
elif inspect.isclass(contrasts) and issubclass(contrasts, Contrasts):
contrasts = contrasts()
if not isinstance(contrasts, Contrasts):
contrasts = CustomContrasts(contrasts)
if levels is not None:
extra_categories = set(pandas.unique(data)).difference(levels)
if extra_categories:
warnings.warn(
"Data has categories outside of the nominated levels (or that were "
f"not seen in original dataset): {extra_categories}. They are being "
" cast to nan, which will likely skew the results of your analyses.",
DataMismatchWarning,
)
data = pandas.Series(pandas.Categorical(data, categories=levels))
else:
data = pandas.Series(data).astype("category")
# Perform dummy encoding
if output in ("pandas", "numpy"):
categories = list(data.cat.categories)
encoded = pandas.get_dummies(data)
elif output == "sparse":
categories, encoded = categorical_encode_series_to_sparse_csc_matrix(
data,
)
else:
raise ValueError(f"Unknown output type `{repr(output)}`.")
# Update state
_state["categories"] = categories
# Apply and return contrasts
return contrasts.apply(
encoded, levels=categories, reduced_rank=reduced_rank, output=output
)
class Contrasts(metaclass=InterfaceMeta):
"""
The base class for all contrast implementations.
"""
INTERFACE_RAISE_ON_VIOLATION = True
FACTOR_FORMAT = "{name}[{field}]"
def apply(
self,
dummies,
levels,
reduced_rank=True,
output: Optional[str] = None,
):
"""
Apply the contrasts defined by this `Contrasts` instance to `dummies`
(the dummy encoding of the values of interest).
Args:
dummies: Dummy encoded representation of the values.
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output a reduced rank matrix. When this is
`False`, the dummy encoding is usually passed through
unmodified.
output: The type of datastructure to output. Should be one of:
"pandas", "numpy", "sparse", or `None`. If `None` is provided,
the output type will be inferred from the input data type.
"""
if output is None:
if isinstance(dummies, pandas.DataFrame):
output = "pandas"
elif isinstance(dummies, numpy.ndarray):
output = "numpy"
elif isinstance(dummies, spsparse.spmatrix):
output = "sparse"
else: # pragma: no cover
raise ValueError(
f"Cannot impute output type for dummies of type `{type(dummies)}`."
)
elif output not in ("pandas", "numpy", "sparse"): # pragma: no cover
raise ValueError(
"Output type for contrasts must be one of: 'pandas', 'numpy' or 'sparse'."
)
sparse = output == "sparse"
encoded = self._apply(
dummies, levels=levels, reduced_rank=reduced_rank, sparse=sparse
)
coding_column_names = self.get_coding_column_names(
levels, reduced_rank=reduced_rank
)
if output == "pandas":
encoded = pandas.DataFrame(
encoded,
columns=coding_column_names,
)
elif output == "numpy":
encoded = numpy.array(encoded)
return FactorValues(
encoded,
kind="categorical",
column_names=coding_column_names,
spans_intercept=self.get_spans_intercept(levels, reduced_rank=reduced_rank),
drop_field=self.get_drop_field(levels, reduced_rank=reduced_rank),
format=self.get_factor_format(levels, reduced_rank=reduced_rank),
encoded=True,
)
def _apply(self, dummies, levels, reduced_rank=True, sparse=False):
coding_matrix = self.get_coding_matrix(levels, reduced_rank, sparse=sparse)
return (dummies if sparse else dummies.values) @ coding_matrix
# Coding matrix methods
def get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
"""
Generate the coding matrix; i.e. the matrix with column vectors
representing the encoding to use for the corresponding level.
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output a reduced rank matrix. When this is
`False`, the dummy encoding is usually passed through
unmodified.
sparse: Whether to output sparse results.
"""
coding_matrix = self._get_coding_matrix(
levels, reduced_rank=reduced_rank, sparse=sparse
)
if sparse:
return coding_matrix
return pandas.DataFrame(
coding_matrix,
columns=self.get_coding_column_names(levels, reduced_rank=reduced_rank),
index=levels,
)
@abstractmethod
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
"""
Subclasses must override this method to implement the generation of the
coding matrix.
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output the reduced rank coding matrix.
sparse: Whether to output sparse results.
"""
@abstractmethod
def get_coding_column_names(self, levels, reduced_rank=True):
"""
Generate the names for the columns of the coding matrix (the encoded
features to be added to the model matrix).
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output the coefficients for reduced rank
encodings.
"""
# Coefficient matrix methods
def get_coefficient_matrix(self, levels, reduced_rank=True, sparse=False):
"""
Generate the coefficient matrix; i.e. the matrix with rows representing
the contrasts effectively computed during a regression, with columns
indicating the weights given to the origin categories. This is primarily
used for debugging/introspection.
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output the coefficients for reduced rank
encodings.
sparse: Whether to output sparse results.
"""
coefficient_matrix = self._get_coefficient_matrix(
levels, reduced_rank=reduced_rank, sparse=sparse
)
if sparse:
return coefficient_matrix
return pandas.DataFrame(
coefficient_matrix,
columns=levels,
index=self.get_coefficient_row_names(levels, reduced_rank=reduced_rank),
)
def _get_coefficient_matrix(self, levels, reduced_rank=True, sparse=False):
coding_matrix = self.get_coding_matrix(
levels, reduced_rank=reduced_rank, sparse=sparse
)
if reduced_rank:
coding_matrix = (spsparse if sparse else numpy).hstack(
[
numpy.ones((len(levels), 1)),
coding_matrix,
]
)
if sparse:
return scipy.sparse.linalg.inv(coding_matrix.tocsc())
return numpy.linalg.inv(coding_matrix)
@abstractmethod
def get_coefficient_row_names(self, levels, reduced_rank=True):
"""
Generate the names for the rows of the coefficient matrix (the
interpretation of the contrasts generated by the coding matrix).
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether to output the coefficients for reduced rank
encodings.
"""
# Additional metadata
def get_spans_intercept(self, levels, reduced_rank=True) -> bool:
"""
Determine whether the encoded contrasts span the intercept.
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether the contrast encoding used had reduced rank.
"""
return not reduced_rank
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]:
"""
Determine which column to drop to be full rank after this encoding.
If this contrast encoding is already reduced in rank, then this method
should return `None`.
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether the contrast encoding used had reduced rank.
"""
if reduced_rank:
return None
return self.get_coding_column_names(levels, reduced_rank=reduced_rank)[0]
def get_factor_format(self, levels, reduced_rank=True):
"""
The format to use when assigning feature names to each encoded feature.
Formats can use two named substitutions: `name` and `field`; for
example: "{name}[{field}]".
Args:
levels: The names of the levels/categories in the data.
reduced_rank: Whether the contrast encoding used had reduced rank.
"""
return self.FACTOR_FORMAT
class TreatmentContrasts(Contrasts):
"""
Treatment (aka. dummy) coding.
This contrast leads to comparisons of the mean of the dependent variable for
each level with some reference level. If not specified, the reference level
is taken to be the first level.
"""
FACTOR_FORMAT = "{name}[T.{field}]"
MISSING = object()
def __init__(self, base=MISSING):
self.base = base
@Contrasts.override
def _apply(self, dummies, levels, reduced_rank=True, sparse=False):
if reduced_rank:
drop_index = self._find_base_index(levels)
mask = numpy.ones(len(levels), dtype=bool)
mask[drop_index] = False
return (
dummies
if sparse or isinstance(dummies, numpy.ndarray)
else dummies.iloc
)[:, mask]
return dummies
def _find_base_index(self, levels):
if self.base is self.MISSING:
return 0
try:
return levels.index(self.base)
except ValueError as e:
raise ValueError(
f"Value `{repr(self.base)}` for `TreatmentContrasts.base` is not among the provided levels."
) from e
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
n = len(levels)
if sparse:
matrix = spsparse.eye(n).tocsc()
else:
matrix = numpy.eye(n)
if reduced_rank:
drop_level = self._find_base_index(levels)
matrix = matrix[:, [i for i in range(matrix.shape[1]) if i != drop_level]]
return matrix
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
base_index = self._find_base_index(levels)
if reduced_rank:
return [level for i, level in enumerate(levels) if i != base_index]
return levels
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
base = levels[self._find_base_index(levels)]
if reduced_rank:
return [base, *(f"{level}-{base}" for level in levels if level != base)]
return levels
@Contrasts.override
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]:
if reduced_rank:
return None
return self.base if self.base is not self.MISSING else levels[0]
class SASContrasts(TreatmentContrasts):
"""
SAS (treatment) contrast coding.
This contrasts generated by this class are the same as
`TreatmentContrasts`, but with the reference level defaulting to the last
level (the default in SAS).
"""
@TreatmentContrasts.override
def _find_base_index(self, levels):
if self.base is self.MISSING:
return len(levels) - 1
try:
return levels.index(self.base)
except ValueError as e:
raise ValueError(
f"Value `{repr(self.base)}` for `SASContrasts.base` is not among the provided levels."
) from e
@TreatmentContrasts.override
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]:
if reduced_rank:
return None
return self.base if self.base is not self.MISSING else levels[-1]
class SumContrasts(Contrasts):
"""
Sum (or Deviation) coding.
These contrasts compare the mean of the dependent variable for each level
(except the last, which is redundant) to the global average of all levels.
"""
FACTOR_FORMAT = "{name}[S.{field}]"
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
n = len(levels)
if not reduced_rank:
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n)
contr = spsparse.eye(n, n - 1).tolil() if sparse else numpy.eye(n, n - 1)
contr[-1, :] = -1
return contr.tocsc() if sparse else contr
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
if reduced_rank:
return levels[:-1]
return levels
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
if reduced_rank:
return ["avg", *(f"{level} - avg" for level in levels[:-1])]
return levels
class HelmertContrasts(Contrasts):
"""
Helmert coding.
These contrasts compare the mean of the dependent variable for each
successive level to the average all previous levels. The default
attribute values are chosen to match the R implementation, which
corresponds to a reversed and unscaled Helmert coding.
Attributes:
reverse: Whether to iterate over successive levels in reverse order.
scale: Whether to scale the encoding to simplify interpretation of
coefficients (results in a floating point model matrix instead of an
integer one).
"""
FACTOR_FORMAT = "{name}[H.{field}]"
def __init__(self, *, reverse: bool = True, scale: bool = False):
self.reverse = reverse
self.scale = scale
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
n = len(levels)
if not reduced_rank:
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n)
contr = spsparse.lil_matrix((n, n - 1)) if sparse else numpy.zeros((n, n - 1))
for i in range(len(levels) - 1):
if self.reverse:
contr[i + 1, i] = i + 1
else:
contr[i, i] = n - i - 1
contr[
numpy.triu_indices(n - 1) if self.reverse else numpy.tril_indices(n, k=-1)
] = -1
if self.scale:
for i in range(n - 1):
contr[:, i] /= i + 2 if self.reverse else n - i
return contr
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
if reduced_rank:
return levels[1:] if self.reverse else levels[:-1]
return levels
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
if reduced_rank:
return [
"avg",
*(
f"{level} - rolling_avg"
for level in (levels[1:] if self.reverse else levels[:-1])
),
]
return levels
class DiffContrasts(Contrasts):
"""
Difference coding.
These contrasts compare the mean of the dependent variable for each level
with that of the previous level. The default attribute values are chosen to
match the R implemention, and correspond to a reverse (or backward)
difference coding.
Attributes:
backward: Whether to reverse the sign of the difference (e.g. Level 2 -
Level 1 cf. Level 1 - Level 2).
"""
FACTOR_FORMAT = "{name}[D.{field}]"
def __init__(self, backward: bool = True):
self.backward = backward
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
n = len(levels)
if not reduced_rank:
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n)
contr = numpy.repeat([numpy.arange(1, n)], n, axis=0) / n
contr[numpy.triu_indices(n, m=n - 1)] -= 1
if not self.backward:
contr *= -1
if sparse:
return spsparse.csc_matrix(contr)
return contr
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
if reduced_rank:
return levels[1:] if self.backward else levels[:-1]
return levels
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
if reduced_rank:
return [
"avg",
*(
f"{level} - {ref}"
for level, ref in (
zip(levels[1:], levels)
if self.backward
else zip(levels, levels[1:])
)
),
]
return levels
class PolyContrasts(Contrasts):
"""
(Orthogonal) Polynomial coding.
These "contrasts" represent a categorical variable that is assumed to have
equal (or known) spacing/scores, and allow us to model non-linear polynomial
behaviour of the dependent variable with respect to the ordered levels.
Attributes:
scores: The "scores" of the categorical variable. If provided, it must
have the same cardinality as the categories being coded.
"""
FACTOR_FORMAT = "{name}{field}"
NAME_ALIASES = {
1: ".L",
2: ".Q",
3: ".C",
}
def __init__(self, scores=None):
self.scores = scores
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
n = len(levels)
if not reduced_rank:
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n)
if self.scores and not len(self.scores) == n:
raise ValueError(
"`PolyContrasts.scores` must have the same cardinality as the categories."
)
scores = self.scores or numpy.arange(n)
coding_matrix = poly(scores, degree=n - 1)
if sparse:
return spsparse.csc_matrix(coding_matrix)
return coding_matrix
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
if reduced_rank:
return [
self.NAME_ALIASES[d] if d in self.NAME_ALIASES else f"^{d}"
for d in range(1, len(levels))
]
return levels
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
if reduced_rank:
return ["avg", *self.get_coding_column_names(levels, reduced_rank=True)]
return levels
class CustomContrasts(Contrasts):
"""
Handle the custom contrast case when users pass in hand-coded contrast
matrices.
"""
def __init__(self, contrasts, names=None):
if isinstance(contrasts, dict):
if names is None:
names = list(contrasts)
contrasts = numpy.array([*contrasts.values()]).T
else:
contrasts = numpy.array(contrasts)
if names is not None and len(names) != contrasts.shape[1]:
raise ValueError(
"Names must be aligned with the columns of the contrast array."
)
self.contrasts = contrasts
self.contrast_names = names
@Contrasts.override
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False):
if sparse:
return spsparse.csc_matrix(self.contrasts)
return self.contrasts
@Contrasts.override
def get_coding_column_names(self, levels, reduced_rank=True):
if self.contrast_names:
return self.contrast_names
return list(range(1, self.contrasts.shape[1] + 1))
@Contrasts.override
def get_coefficient_row_names(self, levels, reduced_rank=True):
return list(range(1, len(levels) + (0 if not reduced_rank else 1)))
@Contrasts.override
def get_spans_intercept(self, levels, reduced_rank=True) -> bool:
return False
@Contrasts.override
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]:
return None
class ContrastsRegistry(type):
"""
The contrast registry, which is exposed in formulae as "contr".
"""
# Same as R
helmert = HelmertContrasts
poly = PolyContrasts
sum = SumContrasts
treatment = TreatmentContrasts
SAS = SASContrasts
# Extra
diff = DiffContrasts
custom = CustomContrasts
from collections import defaultdict
from enum import Enum
from typing import Iterable, Optional, Union
import numpy
import pandas
from formulaic.materializers.types import FactorValues
from formulaic.utils.stateful_transforms import stateful_transform
class SplineExtrapolation(Enum):
"""
Specification for how extrapolation should be performed during spline
computations.
"""
RAISE = "raise"
CLIP = "clip"
NA = "na"
ZERO = "zero"
EXTEND = "extend"
@stateful_transform
def basis_spline(
x: Union[pandas.Series, numpy.ndarray],
df: Optional[int] = None,
knots: Optional[Iterable[float]] = None,
degree: int = 3,
include_intercept: bool = False,
lower_bound: Optional[float] = None,
upper_bound: Optional[float] = None,
extrapolation: Union[str, SplineExtrapolation] = "raise",
_state: dict = None,
) -> FactorValues[dict]:
"""
Evaluates the B-Spline basis vectors for given inputs `x`.
This is especially useful in the context of allowing non-linear fits to data
in linear regression. Except for the addition of the `extrapolation`
parameter, this implementation shares its API with `patsy.splines.bs`, and
should behave identically to both `patsy.splines.bs` and R's `splines::bs`
where functionality overlaps.
Args:
x: The vector for which the B-Spline basis should be computed.
df: The number of degrees of freedom to use for this spline. If
specified, `knots` will be automatically generated such that they
are `df` - `degree` (minus one if `include_intercept` is True)
equally spaced quantiles. You cannot specify both `df` and `knots`.
knots: The internal breakpoints of the B-Spline. If not specified, they
default to the empty list (unless `df` is specified), in which case
the ordinary polynomial (Bezier) basis is generated.
degree: The degree of the B-Spline (the highest degree of terms in the
resulting polynomial). Must be a non-negative integer.
include_intercept: Whether to return a complete (full-rank) basis. Note
that if `ensure_full_rank=True` is passed to the materializer, then
the intercept will (depending on context) nevertheless be omitted.
lower_bound: The lower bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
upper_bound: The upper bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
extrapolation: Selects how extrapolation should be performed when values
in `x` extend beyond the lower and upper bounds. Valid values are:
- 'raise': Raises a `ValueError` if there are any values in `x`
outside the B-Spline domain.
- 'clip': Any values above/below the domain are set to the
upper/lower bounds.
- 'na': Any values outside of bounds are set to `numpy.nan`.
- 'zero': Any values outside of bounds are set to `0`.
- 'extend': Any values outside of bounds are computed by extending
the polynomials of the B-Spline (this is the same as the default
in R).
Returns:
A dictionary representing the encoded vectors ready for ingestion
by materializers (wrapped in a `FactorValues` instance providing
relevant metadata).
Notes:
The implementation employed here uses a slightly generalised version of
the ["Cox-de Boor" algorithm](https://en.wikipedia.org/wiki/B-spline#Definition),
extended by this author to allow for extrapolations (although this
author doubts this is terribly novel). We have not used the `splev`
methods from `scipy` since in benchmarks this implementation outperforms
them for our use-cases.
If you would like to learn more about B-Splines, the primer put together
by Jeffrey Racine is an excellent resource:
https://cran.r-project.org/web/packages/crs/vignettes/spline_primer.pdf
As a stateful transform, we only keep track of `knots`, `lower_bound`
and `upper_bound`, which are sufficient given that all other information
must be explicitly specified.
"""
# Prepare and check arguments
if df is not None and knots is not None:
raise ValueError("You cannot specify both `df` and `knots`.")
if "lower_bound" in _state:
lower_bound = _state["lower_bound"]
else:
lower_bound = _state["lower_bound"] = (
numpy.min(x) if lower_bound is None else lower_bound
)
if "upper_bound" in _state:
upper_bound = _state["upper_bound"]
else:
upper_bound = _state["upper_bound"] = (
numpy.max(x) if upper_bound is None else upper_bound
)
extrapolation = SplineExtrapolation(extrapolation)
# Prepare data
if extrapolation is SplineExtrapolation.RAISE and numpy.any(
(x < lower_bound) | (x > upper_bound)
):
raise ValueError(
"Some field values extend beyond upper and/or lower bounds, which can result in ill-conditioned bases. "
"Pass a value for `extrapolation` to control how extrapolation should be performed."
)
if extrapolation is SplineExtrapolation.CLIP:
x = numpy.clip(x, lower_bound, upper_bound)
if extrapolation is SplineExtrapolation.NA:
x = numpy.where((x >= lower_bound) & (x <= upper_bound), x, numpy.nan)
# Prepare knots
if "knots" not in _state:
knots = [] if knots is None else list(knots)
if df:
nknots = df - degree - (1 if include_intercept else 0)
if nknots < 0:
raise ValueError(
f"Invalid value for `df`. `df` must be greater than {degree + (1 if include_intercept else 0)} [`degree` (+ 1 if `include_intercept` is `True`)]."
)
knots = list(
numpy.quantile(x, numpy.linspace(0, 1, nknots + 2))[1:-1].ravel()
)
knots.insert(0, lower_bound)
knots.append(upper_bound)
knots = list(numpy.pad(knots, degree, mode="edge"))
_state["knots"] = knots
knots = _state["knots"]
# Compute basis splines
# The following code is equivalent to [B(i, j=degree) for in range(len(knots)-d-1)], with B(i, j) as defined below.
# B = lambda i, j: ((x >= knots[i]) & (x < knots[i+1])).astype(float) if j == 0 else alpha(i, j, x) * B(i, j-1, x) + (1 - alpha(i+1, j, x)) * B(i+1, j-1, x)
# We don't directly use this recurrence relation so that we can memoise the B(i, j).
cache = defaultdict(dict)
alpha = (
lambda i, j: (x - knots[i]) / (knots[i + j] - knots[i])
if knots[i + j] != knots[i]
else 0
)
for i in range(len(knots) - 1):
if extrapolation is SplineExtrapolation.EXTEND:
cache[0][i] = (
(x >= (knots[i] if i != degree else -numpy.inf))
& (
x
< (knots[i + 1] if i + 1 != len(knots) - degree - 1 else numpy.inf)
)
).astype(float)
else:
cache[0][i] = (
(x >= knots[i])
& (
(x < knots[i + 1])
if i + 1 != len(knots) - degree - 1
else (x <= knots[i + 1]) # Properly handle boundary
)
).astype(float)
for d in range(1, degree + 1):
cache[d % 2].clear()
for i in range(len(knots) - d - 1):
cache[d % 2][i] = (
alpha(i, d) * cache[(d - 1) % 2][i]
+ (1 - alpha(i + 1, d)) * cache[(d - 1) % 2][i + 1]
)
return FactorValues(
{
i: cache[degree % 2][i]
for i in sorted(cache[degree % 2])
if i > 0 or include_intercept
},
kind="numerical",
spans_intercept=include_intercept,
drop_field=0,
format="{name}[{field}]",
encoded=False,
)
import numpy
from .basis_spline import basis_spline
from .identity import identity
from .contrasts import C, encode_contrasts, ContrastsRegistry
from .poly import poly
from .scale import center, scale
__all__ = [
"basis_spline",
"identity",
"C",
"encode_contrasts",
"ContrastsRegistry",
"poly",
"center",
"scale",
"TRANSFORMS",
]
TRANSFORMS = {
# Common transforms
"np": numpy,
"log": numpy.log,
"log10": numpy.log10,
"log2": numpy.log2,
"exp": numpy.exp,
"exp10": lambda x: numpy.power(x, 10),
"exp2": numpy.exp2,
# Bespoke transforms
"bs": basis_spline,
"center": center,
"poly": poly,
"scale": scale,
"C": C,
"contr": ContrastsRegistry,
"I": identity,
}
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy
from formulaic.materializers.types import FactorValues
from formulaic.utils.stateful_transforms import stateful_transform
try:
import numpy.typing
except ImportError as e: # pragma: no cover
if TYPE_CHECKING:
raise RuntimeError("Numpy >=1.20 is required for type-checking.") from e
@stateful_transform
def poly(
x: numpy.typing.ArrayLike, degree: int = 1, raw: bool = False, _state=None
) -> numpy.ndarray:
"""
Generate a basis for a polynomial vector-space representation of `x`.
The basis vectors returned by this transform can be used, for example, to
capture non-linear dependence on `x` in a linear regression.
Args:
x: The vector for which a polynomial vector space should be generated.
degree: The degree of the polynomial vector space.
raw: Whether to return "raw" basis vectors (e.g. `[x, x**2, x**3]`). If
`False`, an orthonormal set of basis vectors is returned instead
(see notes below for more information).
Returns:
A two-dimensional numpy array with `len(x)` rows, and `degree` columns.
The columns represent the basis vectors of the polynomial vector-space.
Notes:
This transform is an implementation of the "three-term recurrence
relation" for monic orthogonal polynomials. There are many good
introductions to these recurrence relations, including:
https://dec41.user.srcf.net/h/IB_L/numerical_analysis/2_3
Another common approach is QR factorisation, where the columns of Q are
the orthogonal basis vectors. However, our implementation outperforms
numpy's QR decomposition, and does not require needless computation of
the R matrix. It should also be noted that orthogonal polynomial bases
are unique up to the choice of inner-product and scaling, and so all
methods will result in the same set of polynomials.
When used as a stateful transform, we retain the coefficients that
uniquely define the polynomials; and so new data will be evaluated
against the same polynomial bases as the original dataset. However,
the polynomial basis will almost certainly *not* be orthogonal for the
new data. This is because changing the incoming dataset is equivalent to
changing your choice of inner product.
Using orthogonal basis vectors (as compared to the "raw" vectors) allows
you to increase the degree of the polynomial vector space without
affecting the coefficients of lower-order components in a linear
regression. This stability is often attractive during exploratory data
analysis, but does not otherwise change the results of a linear
regression.
`nan` values in `x` will be ignored and progagated through to generated
polynomials.
The signature of this transform is intentionally chosen to be compatible
with R.
"""
if raw:
return numpy.stack([numpy.power(x, k) for k in range(1, degree + 1)], axis=1)
x = numpy.array(x)
# Check if we already have generated the alpha and beta coefficients.
# If not, we enter "training" mode.
training = False
alpha = _state.get("alpha")
norms2 = _state.get("norms2")
if alpha is None:
training = True
alpha = {}
norms2 = {}
# Build polynomials iteratively using the monic three-term recurrence relation
# Note that alpha and beta are fixed if not in "training" mode.
P = numpy.empty((x.shape[0], degree + 1))
P[:, 0] = 1
def get_alpha(k):
if training and k not in alpha:
alpha[k] = numpy.sum(x * P[:, k] ** 2) / numpy.sum(P[:, k] ** 2)
return alpha[k]
def get_norm(k):
if training and k not in norms2:
norms2[k] = numpy.sum(P[:, k] ** 2)
return norms2[k]
def get_beta(k):
return get_norm(k) / get_norm(k - 1)
for i in range(1, degree + 1):
P[:, i] = (x - get_alpha(i - 1)) * P[:, i - 1]
if i >= 2:
P[:, i] -= get_beta(i - 1) * P[:, i - 2]
# Renormalize so we provide an orthonormal basis.
P /= numpy.array([numpy.sqrt(get_norm(k)) for k in range(0, degree + 1)])
if training:
_state["alpha"] = alpha
_state["norms2"] = norms2
# Return basis dropping the first (constant) column
return FactorValues(
P[:, 1:], column_names=tuple(str(i) for i in range(1, degree + 1))
)
def identity(data):
return data
import numpy
import scipy.sparse as spsparse
from formulaic.utils.stateful_transforms import stateful_transform
@stateful_transform
def scale(data, center=True, scale=True, ddof=1, _state=None):
data = numpy.array(data)
if "ddof" not in _state:
_state["ddof"] = ddof
else:
ddof = _state["ddof"]
# Handle centering
if "center" not in _state:
if isinstance(center, bool) and center:
_state["center"] = numpy.mean(data, axis=0)
elif not isinstance(center, bool):
_state["center"] = numpy.array(center)
else:
_state["center"] = None
if _state["center"] is not None:
data = data - _state["center"]
# Handle scaling
if "scale" not in _state:
if isinstance(scale, bool) and scale:
_state["scale"] = numpy.sqrt(
numpy.sum(data**2, axis=0) / (data.shape[0] - ddof)
)
elif not isinstance(scale, bool):
_state["scale"] = numpy.array(scale)
else:
_state["scale"] = None
if _state["scale"] is not None:
data = data / _state["scale"]
return data
@scale.register(spsparse.spmatrix)
def _(data, *args, **kwargs):
assert data.shape[1] == 1
return scale(data.toarray()[:, 0], *args, **kwargs)
@stateful_transform
def center(data, _state=None):
return scale(data, scale=False, _state=_state)
# Top-level error and warning classes
class FormulaicError(Exception):
pass
class FormulaicWarning(Warning):
pass
# Formula parsing errors
class FormulaInvalidError(FormulaicError):
"""
Provided formula specification is not a valid format.
"""
class FormulaParsingError(FormulaicError):
"""
An error occured during the parsing of a formula specification.
"""
class FormulaSyntaxError(FormulaParsingError):
"""
Could not tokenize the nominated formula specification.
"""
# Formula materializer meta-errors
class FormulaMaterializerInvalidError(FormulaicError):
pass
class FormulaMaterializerNotFoundError(FormulaicError):
pass
# Data materialization errors and warnings
class FormulaMaterializationError(FormulaicError):
pass
class FactorEncodingError(FormulaMaterializationError):
pass
class FactorEvaluationError(FormulaMaterializationError):
pass
class DataMismatchWarning(FormulaicWarning):
pass
from enum import Enum
class NAAction(Enum):
DROP = "drop"
RAISE = "raise"
IGNORE = "ignore"
from .scoped_factor import ScopedFactor
class ScopedTerm:
__slots__ = ("factors", "scale")
def __init__(self, factors, scale=None):
self.factors = tuple(sorted(factors))
self.scale = scale
def __hash__(self):
return hash(self.factors)
def __eq__(self, other):
if isinstance(other, ScopedTerm):
return self.factors == other.factors
return NotImplemented
def __repr__(self):
factor_repr = (
":".join(f.__repr__() for f in sorted(self.factors))
if self.factors
else "1"
)
if self.scale is not None and self.scale != 1:
return f"{self.scale}*{factor_repr}"
return factor_repr
def copy(self, *, without_values=False):
factors = self.factors
if without_values:
factors = [
ScopedFactor(
factor=factor.factor.replace(values=None),
reduced=factor.reduced,
)
for factor in factors
]
return ScopedTerm(factors, scale=self.scale)
from __future__ import annotations
from dataclasses import dataclass, replace
from typing import Any, Optional
from formulaic.parser.types import Factor
from .factor_values import FactorValues, FactorValuesMetadata
@dataclass
class EvaluatedFactor:
"""
A container for the evaluated state of a `Factor` object in a given context.
This class acts as the glue between an abstract `Factor` specification and
the realisation of that factor in a specific data context.
Attributes:
factor: The `Factor` instance for which values have been computed.
values: The evaluated values for the factor.
"""
factor: Optional[Factor] = None
values: Optional[FactorValues[Any]] = None
@property
def expr(self) -> str:
"""
The expression of the evaluated factor.
"""
return self.factor.expr
@property
def metadata(self) -> FactorValuesMetadata:
"""
The metadata associated with the evaluated values.
"""
return self.values.__formulaic_metadata__
def __repr__(self) -> str:
return repr(self.factor)
def __eq__(self, other) -> bool:
if isinstance(other, EvaluatedFactor):
return self.factor == other.factor
return NotImplemented
def __lt__(self, other) -> bool:
if isinstance(other, EvaluatedFactor):
return self.factor < other.factor
return NotImplemented
def replace(self, **changes) -> EvaluatedFactor:
"""
Return a copy of this `EvaluatedFactor` instance with the nominated
attributes mutated.
"""
return replace(self, **changes)
class ScopedFactor:
def __init__(self, factor, reduced=False):
self.factor = factor
self.reduced = reduced
def __repr__(self):
return repr(self.factor) + ("-" if self.reduced else "")
def __hash__(self):
return hash(repr(self))
def __eq__(self, other):
if isinstance(other, ScopedFactor):
return self.factor == other.factor and self.reduced == other.reduced
return NotImplemented
def __lt__(self, other):
if isinstance(other, ScopedFactor):
if self.factor == other.factor:
return self.reduced > other.reduced
return self.factor < other.factor
return NotImplemented
from .enums import NAAction
from .evaluated_factor import EvaluatedFactor
from .factor_values import FactorValues
from .scoped_factor import ScopedFactor
from .scoped_term import ScopedTerm
__all__ = [
"EvaluatedFactor",
"FactorValues",
"NAAction",
"ScopedFactor",
"ScopedTerm",
]
from __future__ import annotations
import copy
from dataclasses import dataclass, replace
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar, Union
import wrapt
from formulaic.parser.types import Factor
from formulaic.utils.sentinels import MISSING
T = TypeVar("T")
@dataclass
class FactorValuesMetadata:
"""
Metadata about evaluated factor values.
This metadata is used to inform materializers about how to treat these
values.
Attributes:
kind: The kind of the evaluated values.
spans_intercept: Whether the values span the intercept or not.
drop_field: If the values do span the intercept, and we want to reduce
the rank, which field should be dropped.
format: The format to use when exploding factors into multiple columns
(e.g. when encoding categories via dummy-encoding).
encoded: Whether the values should be treated as pre-encoded.
encoder: An optional callable with signature
`(values: Any, reduced_rank: bool, drop_rows: List[int], encoder_state: Dict[str, Any], spec: ModelSpec)`
that outputs properly encoded values suitable for the current
materializer. Note that this should only be used in cases where
direct evaluation would yield different results in reduced vs.
non-reduced rank scenarios.
"""
kind: Factor.Kind = Factor.Kind.UNKNOWN
column_names: Optional[Tuple[str]] = None
spans_intercept: bool = False
drop_field: Optional[str] = None
format: str = "{name}[{field}]"
encoded: bool = False
encoder: Optional[Callable[[Any, bool, List[int], Dict[str, Any]], Any]] = None
def replace(self, **kwargs) -> FactorValuesMetadata:
"""
Return a copy of this `FactorValuesMetadata` instance with the nominated
attributes replaced.
"""
if not kwargs:
return self
return replace(self, **kwargs)
class FactorValues(Generic[T], wrapt.ObjectProxy):
"""
A convenience wrapper that surfaces a `FactorValuesMetadata` instance at
`<object>.__formulaic_metadata__`. This wrapper can otherwise wrap any
object and behaves just like that object.
"""
def __init__(
self,
values: Any,
*,
metadata: FactorValuesMetadata = MISSING,
kind: Union[str, Factor.Kind] = MISSING,
column_names: Tuple[str] = MISSING,
spans_intercept: bool = MISSING,
drop_field: Optional[str] = MISSING,
format: str = MISSING, # pylint: disable=redefined-builtin
encoded: bool = MISSING,
encoder: Optional[
Callable[[Any, bool, List[int], Dict[str, Any]], Any]
] = MISSING,
):
metadata_constructor = FactorValuesMetadata
metadata_kwargs = dict(
kind=Factor.Kind(kind) if kind is not MISSING else kind,
column_names=column_names,
spans_intercept=spans_intercept,
drop_field=drop_field,
format=format,
encoded=encoded,
encoder=encoder,
)
for key in set(metadata_kwargs):
if metadata_kwargs[key] is MISSING:
metadata_kwargs.pop(key)
if hasattr(values, "__formulaic_metadata__"):
metadata_constructor = values.__formulaic_metadata__.replace
if isinstance(values, FactorValues):
values = values.__wrapped__
if metadata:
metadata_constructor = metadata.replace
wrapt.ObjectProxy.__init__(self, values)
self._self_metadata = metadata_constructor(**metadata_kwargs)
@property
def __formulaic_metadata__(self) -> FactorValuesMetadata:
return self._self_metadata
def __repr__(self) -> str:
return self.__wrapped__.__repr__() # pragma: no cover
# Handle copying behaviour
def __copy__(self):
return type(self)(copy.copy(self.__wrapped__), metadata=self._self_metadata)
def __deepcopy__(self, memo=None):
return type(self)(
copy.deepcopy(self.__wrapped__, memo),
metadata=copy.deepcopy(self._self_metadata),
)
import functools
import itertools
from collections import OrderedDict
import numpy
import pandas
import scipy.sparse as spsparse
from interface_meta import override
from formulaic.utils.cast import as_columns
from .base import FormulaMaterializer
from .types import NAAction
class PandasMaterializer(FormulaMaterializer):
REGISTER_NAME = "pandas"
REGISTER_INPUTS = ("pandas.core.frame.DataFrame",)
REGISTER_OUTPUTS = ("pandas", "numpy", "sparse")
@override
def _is_categorical(self, values):
if isinstance(values, (pandas.Series, pandas.Categorical)):
return values.dtype == object or isinstance(
values.dtype, pandas.CategoricalDtype
)
return super()._is_categorical(values)
@override
def _check_for_nulls(self, name, values, na_action, drop_rows):
if na_action is NAAction.IGNORE:
return
if isinstance(
values, dict
): # pragma: no cover; no formulaic transforms return dictionaries any more
for key, vs in values.items():
self._check_for_nulls(f"{name}[{key}]", vs, na_action, drop_rows)
elif na_action is NAAction.RAISE:
if isinstance(values, pandas.Series) and values.isnull().values.any():
raise ValueError(f"`{name}` contains null values after evaluation.")
elif na_action is NAAction.DROP:
if isinstance(values, pandas.Series):
drop_rows.update(numpy.flatnonzero(values.isnull().values))
else:
raise ValueError(
f"Do not know how to interpret `na_action` = {repr(na_action)}."
) # pragma: no cover; this is currently impossible to reach
@override
def _encode_constant(self, value, metadata, encoder_state, spec, drop_rows):
if spec.output == "sparse":
return spsparse.csc_matrix(
numpy.array([value] * self.nrows).reshape(
(self.nrows - len(drop_rows), 1)
)
)
series = value * numpy.ones(self.nrows - len(drop_rows))
return series
@override
def _encode_numerical(self, values, metadata, encoder_state, spec, drop_rows):
if drop_rows:
values = values.drop(index=values.index[drop_rows])
if spec.output == "sparse":
return spsparse.csc_matrix(numpy.array(values).reshape((self.nrows, 1)))
return values
@override
def _encode_categorical(
self, values, metadata, encoder_state, spec, drop_rows, reduced_rank=False
):
# Even though we could reduce rank here, we do not, so that the same
# encoding can be cached for both reduced and unreduced rank. The
# rank will be reduced in the _encode_evaled_factor method.
from formulaic.transforms import encode_contrasts
if drop_rows:
values = values.drop(index=values.index[drop_rows])
return as_columns(
encode_contrasts(
values,
reduced_rank=False,
_metadata=metadata,
_state=encoder_state,
_spec=spec,
)
)
@override
def _get_columns_for_term(self, factors, spec, scale=1):
out = OrderedDict()
# Pre-multiply factors with only one set of values (improves performance)
solo_factors = {}
indices = []
for i, factor in enumerate(factors):
if len(factor) == 1:
solo_factors.update(factor)
indices.append(i)
if solo_factors:
for index in reversed(indices):
factors.pop(index)
if spec.output == "sparse":
factors.append(
{
":".join(solo_factors): functools.reduce(
spsparse.csc_matrix.multiply, solo_factors.values()
)
}
)
else:
factors.append(
{
":".join(solo_factors): functools.reduce(
numpy.multiply,
(numpy.asanyarray(p) for p in solo_factors.values()),
)
}
)
for product in itertools.product(*(factor.items() for factor in factors)):
if spec.output == "sparse":
out[":".join(p[0] for p in product)] = scale * functools.reduce(
spsparse.csc_matrix.multiply, (p[1] for p in product)
)
else:
out[":".join(p[0] for p in product)] = scale * functools.reduce(
numpy.multiply,
(numpy.array(p[1]) for p in product),
)
return out
@override
def _combine_columns(self, cols, spec, drop_rows):
# If we are outputing a pandas DataFrame, explicitly override index
# in case transforms/etc have lost track of it.
if spec.output == "pandas":
pandas_index = self.data_context.index
if drop_rows:
pandas_index = pandas_index.drop(self.data_context.index[drop_rows])
# Special case no columns to empty csc_matrix, array, or DataFrame
if not cols:
values = numpy.empty((self.data.shape[0], 0))
if spec.output == "sparse":
return spsparse.csc_matrix(values)
if spec.output == "numpy":
return values
return pandas.DataFrame(index=pandas_index)
# Otherwise, concatenate columns into model matrix
if spec.output == "sparse":
return spsparse.hstack([col[1] for col in cols])
if spec.output == "numpy":
return numpy.stack([col[1] for col in cols], axis=1)
return pandas.DataFrame(
{col[0]: col[1] for col in cols},
index=pandas_index,
copy=False,
)
from interface_meta import override
import pandas
from .pandas import PandasMaterializer
class ArrowMaterializer(PandasMaterializer):
REGISTER_NAME = "arrow"
REGISTER_INPUTS = ("pyarrow.lib.Table",)
@override
def _init(self):
self.__data_context = LazyArrowTableProxy(self.data)
@override
@property
def data_context(self):
return self.__data_context
class LazyArrowTableProxy:
def __init__(self, table):
self.table = table
self.column_names = set(self.table.column_names)
self._cache = {}
self.index = pandas.RangeIndex(len(table))
def __contains__(self, value):
return value in self.column_names
def __getitem__(self, key):
if key not in self.column_names:
raise KeyError(key)
if key not in self._cache:
self._cache[key] = self.table.column(key).to_pandas()
return self._cache[key]
from .arrow import ArrowMaterializer
from .base import FormulaMaterializer
from .pandas import PandasMaterializer
from .types import FactorValues, NAAction
__all__ = [
"ArrowMaterializer",
"FormulaMaterializer",
"PandasMaterializer",
# Useful types
"NAAction",
"FactorValues",
]
from __future__ import annotations
import functools
import inspect
import itertools
import operator
from abc import abstractmethod
from collections import defaultdict, OrderedDict, namedtuple
from typing import (
Any,
Dict,
Generator,
List,
Iterable,
Set,
Tuple,
Union,
TYPE_CHECKING,
)
from interface_meta import InterfaceMeta, inherit_docs
from formulaic.errors import (
FactorEncodingError,
FactorEvaluationError,
FormulaMaterializationError,
FormulaMaterializerInvalidError,
FormulaMaterializerNotFoundError,
)
from formulaic.materializers.types.factor_values import FactorValuesMetadata
from formulaic.model_matrix import ModelMatrices, ModelMatrix
from formulaic.parser.types import Factor, Term
from formulaic.transforms import TRANSFORMS
from formulaic.utils.cast import as_columns
from formulaic.utils.layered_mapping import LayeredMapping
from formulaic.utils.stateful_transforms import stateful_eval
from .types import EvaluatedFactor, FactorValues, ScopedFactor, ScopedTerm
if TYPE_CHECKING: # pragma: no cover
from formulaic import FormulaSpec, ModelSpec, ModelSpecs
EncodedTermStructure = namedtuple(
"EncodedTermStructure", ("term", "scoped_terms", "columns")
)
class FormulaMaterializerMeta(InterfaceMeta):
INTERFACE_RAISE_ON_VIOLATION = True
REGISTERED_NAMES = {}
REGISTERED_INPUTS = defaultdict(list)
def __register_implementation__(cls):
if "REGISTER_NAME" in cls.__dict__ and cls.REGISTER_NAME:
cls.REGISTERED_NAMES[cls.REGISTER_NAME] = cls
if "REGISTER_INPUTS" in cls.__dict__:
for input_type in cls.REGISTER_INPUTS:
cls.REGISTERED_INPUTS[input_type] = sorted(
cls.REGISTERED_INPUTS[input_type] + [cls],
key=lambda x: x.REGISTER_PRECEDENCE,
reverse=True,
)
def for_materializer(cls, materializer):
if isinstance(materializer, str):
if materializer not in cls.REGISTERED_NAMES:
raise FormulaMaterializerNotFoundError(materializer)
return cls.REGISTERED_NAMES[materializer]
if isinstance(materializer, FormulaMaterializer):
return type(materializer)
if not inspect.isclass(materializer) or not issubclass(
materializer, FormulaMaterializer
):
raise FormulaMaterializerInvalidError(
"Materializers must be subclasses of `formulaic.materializers.FormulaMaterializer`."
)
return materializer
def for_data(cls, data, output=None):
datacls = data.__class__
input_type = f"{datacls.__module__}.{datacls.__qualname__}"
if input_type not in cls.REGISTERED_INPUTS:
raise FormulaMaterializerNotFoundError(
f"No materializer has been registered for input type {repr(input_type)}. Available input types are: {set(cls.REGISTER_INPUTS)}."
)
if output is None:
return cls.REGISTERED_INPUTS[input_type][0]
for materializer in cls.REGISTERED_INPUTS[input_type]:
if output in materializer.REGISTER_OUTPUTS:
return materializer
output_types = set(
*itertools.chain(
materializer.REGISTER_OUTPUTS
for materializer in cls.REGISTERED_INPUTS[input_type]
)
)
raise FormulaMaterializerNotFoundError(
f"No materializer has been registered for input type {repr(input_type)} that supports output type {repr(output)}. Available output types for {repr(input_type)} are: {output_types}."
)
class FormulaMaterializer(metaclass=FormulaMaterializerMeta):
REGISTER_NAME = None
REGISTER_INPUTS = set()
REGISTER_OUTPUTS = set()
REGISTER_PRECEDENCE = 100
# Public API
@inherit_docs(method="_init")
def __init__(self, data, context=None, **params):
self.data = data
self.context = context or {}
self.params = params
self._init()
self.layered_context = LayeredMapping(
self.data_context, self.context, TRANSFORMS
)
self.factor_cache = {}
self.encoded_cache = {}
def _init(self):
pass # pragma: no cover
@property
def data_context(self):
return self.data
@property
def nrows(self):
return len(self.data)
def get_model_matrix(
self,
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs],
**spec_overrides,
):
from formulaic import ModelSpec
# Prepare ModelSpec(s)
spec: Union[ModelSpec, ModelSpecs] = ModelSpec.from_spec(spec, **spec_overrides)
should_simplify = isinstance(spec, ModelSpec)
model_specs: ModelSpecs = self._prepare_model_specs(spec)
# Step 0: Pool all factors and transform state, ensuring consistency
# during factor evaluation (esp. which rows get dropped).
(
factors,
factor_evaluation_model_spec,
) = self._prepare_factor_evaluation_model_spec(model_specs)
# Step 1: Evaluate all factors and cache the results, keeping track of
# which rows need dropping (if `self.config.na_action == 'drop'`).
drop_rows = set()
for factor in factors:
self._evaluate_factor(factor, factor_evaluation_model_spec, drop_rows)
drop_rows = sorted(drop_rows)
# Step 2: Update the structured model specs with the information from
# the shared transform state pool.
model_specs._map(
lambda ms: ms.transform_state.update(
{
factor.expr: factor_evaluation_model_spec.transform_state[
factor.expr
]
for term in ms.formula
for factor in term.factors
if factor.expr in factor_evaluation_model_spec.transform_state
}
)
)
# Step 3: Build the model matrices using the shared factor cache, and
# by recursing over the structured model matrices.
model_matrices = model_specs._map(
lambda model_spec: self._build_model_matrix(
model_spec, drop_rows=drop_rows
),
as_type=ModelMatrices,
)
if should_simplify:
return model_matrices._simplify()
return model_matrices
def _build_model_matrix(self, spec: ModelSpec, drop_rows):
# Step 1: Determine strategy to maintain structural full-rankness of output matrix
scoped_terms_for_terms = self._get_scoped_terms(
spec.formula, ensure_full_rank=spec.ensure_full_rank
)
# Step 2: Generate the columns which will be collated into the full matrix
cols = []
for term, scoped_terms in scoped_terms_for_terms:
scoped_cols = OrderedDict()
for scoped_term in scoped_terms:
if not scoped_term.factors:
scoped_cols[
"Intercept"
] = scoped_term.scale * self._encode_constant(
1, None, {}, spec, drop_rows
)
else:
scoped_cols.update(
self._get_columns_for_term(
[
self._encode_evaled_factor(
scoped_factor.factor,
spec,
drop_rows,
reduced_rank=scoped_factor.reduced,
)
for scoped_factor in sorted(scoped_term.factors)
],
spec=spec,
scale=scoped_term.scale,
)
)
cols.append((term, scoped_terms, scoped_cols))
# Step 3: Populate remaining model spec fields
if spec.structure:
cols = self._enforce_structure(cols, spec, drop_rows)
else:
spec = spec.update(
structure=[
EncodedTermStructure(
term,
list(st.copy(without_values=True) for st in scoped_terms),
list(scoped_cols),
)
for term, scoped_terms, scoped_cols in cols
],
)
# Step 4: Collate factors into one ModelMatrix
return ModelMatrix(
self._combine_columns(
[
(name, values)
for term, scoped_terms, scoped_cols in cols
for name, values in scoped_cols.items()
],
spec=spec,
drop_rows=drop_rows,
),
spec=spec,
)
# Methods related to input preparation
def _prepare_model_specs(self, spec: Union[ModelSpec, ModelSpecs]) -> ModelSpecs:
from formulaic.model_spec import ModelSpecs
if not isinstance(spec, ModelSpecs):
spec = ModelSpecs(spec)
def prepare_model_spec(model_spec: ModelSpec):
overrides = {
"materializer": self.REGISTER_NAME,
"materializer_params": self.params,
}
if model_spec.output is None:
overrides["output"] = self.REGISTER_OUTPUTS[0]
elif model_spec.output not in self.REGISTER_OUTPUTS:
raise FormulaMaterializationError(
f"Nominated output {repr(model_spec.output)} is invalid. Available output types are: {set(self.REGISTER_OUTPUTS)}."
)
return model_spec.update(**overrides)
return spec._map(prepare_model_spec, as_type=ModelSpecs)
def _prepare_factor_evaluation_model_spec(self, model_specs: ModelSpecs):
from formulaic.model_spec import ModelSpec
output = set()
na_action = set()
ensure_full_rank = set()
factors = set()
transform_state = {}
def update_pooled_spec(model_spec: ModelSpec):
output.add(model_spec.output)
na_action.add(model_spec.na_action)
ensure_full_rank.add(model_spec.ensure_full_rank)
factors.update(
itertools.chain(*(term.factors for term in model_spec.formula))
)
transform_state.update(
model_spec.transform_state
) # TODO: Check for consistency?
model_specs._map(update_pooled_spec)
if len(output) != 1 or len(na_action) != 1 or len(ensure_full_rank) != 1:
raise RuntimeError(
"Provided `ModelSpec` instances are not consistent."
) # pragma: no cover; will only occur if users manually construct a structured model spec.
return factors, ModelSpec(
formula=[],
ensure_full_rank=next(iter(ensure_full_rank)),
na_action=next(iter(na_action)),
output=next(iter(output)),
transform_state=transform_state,
)
# Methods related to ensuring out matrices are structurally full-rank
def _get_scoped_terms(self, terms, ensure_full_rank=True):
"""
Generate the terms to be used in the model matrix.
This method first evaluates each factor in the context of the data
(and environment), and then determines the correct "scope" (full vs.
reduced rank) for each term. If `ensure_full_rank` is `True`, then the
resulting terms when combined is guaranteed to be structurally full-rank.
Args:
terms (list<Term>): A list of term arguments (usually from a formula)
object.
ensure_full_rank (bool): Whether evaluated terms should be scoped
to ensure that their combination will result in a full-rank
matrix.
transform_state (dict): The state of any stateful transforms
(will be populated if empty).
Returns:
list<ScopedTerm>: A list of appropriately scoped terms.
"""
spanned = set()
for term in terms:
evaled_factors = [self.factor_cache[factor.expr] for factor in term.factors]
if ensure_full_rank:
term_span = self._get_scoped_terms_spanned_by_evaled_factors(
evaled_factors
).difference(spanned)
scoped_terms = self._simplify_scoped_terms(term_span)
spanned.update(term_span)
else:
scoped_terms = [
ScopedTerm(
factors=(
ScopedFactor(evaled_factor, reduced=False)
for evaled_factor in evaled_factors
if evaled_factor.metadata.kind is not Factor.Kind.CONSTANT
),
scale=functools.reduce(
operator.mul,
[
evaled_factor.values
for evaled_factor in evaled_factors
if evaled_factor.metadata.kind.value
is Factor.Kind.CONSTANT
],
1,
),
)
]
yield term, scoped_terms
@classmethod
def _get_scoped_terms_spanned_by_evaled_factors(
cls, evaled_factors: Iterable[EvaluatedFactor]
) -> Set[ScopedTerm]:
"""
Return the set of ScopedTerm instances which span the set of
evaluated factors.
Args:
evaled_factors: The evaluated factors for which to generated scoped
terms.
Returns:
The scoped terms for the nominated `evaled_factors`.
"""
scale = 1
factors = []
for factor in evaled_factors:
if factor.metadata.kind is Factor.Kind.CONSTANT:
scale *= factor.values
elif factor.metadata.spans_intercept:
factors.append((1, ScopedFactor(factor, reduced=True)))
else:
factors.append((ScopedFactor(factor),))
return set(
ScopedTerm(factors=(p for p in prod if p != 1), scale=scale)
for prod in itertools.product(*factors)
)
@classmethod
def _simplify_scoped_terms(cls, scoped_terms):
"""
Return the minimal set of ScopedTerm instances that spans the same vectorspace.
"""
terms = []
for scoped_term in sorted(scoped_terms, key=lambda x: len(x.factors)):
factors = set(scoped_term.factors)
combined = False
for co_scoped_term in terms:
cofactors = set(co_scoped_term.factors)
factors_diff = factors.difference(cofactors)
if len(factors) - 1 != len(cofactors) or len(factors_diff) != 1:
continue
factor_new = next(iter(factors_diff))
if factor_new.reduced:
co_scoped_term.factors += (
ScopedFactor(factor_new.factor, reduced=False),
)
terms = cls._simplify_scoped_terms(terms)
combined = True
break
if not combined:
terms.append(scoped_term.copy())
return terms
# Methods related to looking-up, evaluating and encoding terms and factors
def _evaluate_factor(
self, factor: Factor, spec: ModelSpec, drop_rows: set
) -> EvaluatedFactor:
if factor.expr not in self.factor_cache:
try:
if factor.eval_method.value == "lookup":
value = self._lookup(factor.expr)
elif factor.eval_method.value == "python":
value = self._evaluate(factor.expr, factor.metadata, spec)
elif factor.eval_method.value == "literal":
value = FactorValues(
self._evaluate(factor.expr, factor.metadata, spec),
kind=Factor.Kind.CONSTANT,
)
else:
raise FactorEvaluationError(
f"The evaluation method `{factor.eval_method.value}` for factor `{factor}` is not understood."
)
except FactorEvaluationError:
raise
except Exception as e:
raise FactorEvaluationError(
f"Unable to evaluate factor `{factor}`. [{type(e).__name__}: {e}]"
) from e
if not isinstance(value, FactorValues):
value = FactorValues(value)
if value.__formulaic_metadata__.kind is Factor.Kind.UNKNOWN:
if self._is_categorical(value):
kind = Factor.Kind.CATEGORICAL
spans_intercept = True
else:
kind = Factor.Kind.NUMERICAL
spans_intercept = False
value = FactorValues(value, kind=kind, spans_intercept=spans_intercept)
if (
factor.kind is not Factor.Kind.UNKNOWN
and factor.kind is not value.__formulaic_metadata__.kind
):
if factor.kind is Factor.Kind.CATEGORICAL:
value.__formulaic_metadata__.kind = factor.kind
else:
raise FactorEncodingError(
f"Factor `{factor}` is expecting values of kind '{factor.kind.value}', "
f"but they are actually of kind '{value.__formulaic_metadata__.kind.value}'."
)
if (
factor.expr in spec.encoder_state
and value.__formulaic_metadata__.kind
is not spec.encoder_state[factor.expr][0]
):
raise FactorEncodingError(
f"The model specification expects factor `{factor}` to have values of kind "
f"`{spec.encoder_state[factor.expr][0]}`, but they are actually of kind "
f"`{value.__formulaic_metadata__.kind.value}`."
)
self._check_for_nulls(factor.expr, value, spec.na_action, drop_rows)
self.factor_cache[factor.expr] = EvaluatedFactor(
factor=factor, values=value
)
return self.factor_cache[factor.expr]
def _lookup(self, name):
return self.layered_context[name]
def _evaluate(self, expr, metadata, spec):
return stateful_eval(
expr, self.layered_context, {expr: metadata}, spec.transform_state, spec
)
def _is_categorical(self, values):
if hasattr(values, "__formulaic_metadata__"):
return values.__formulaic_metadata__.kind is Factor.Kind.CATEGORICAL
return False
def _check_for_nulls(self, name, values, na_action, drop_rows):
pass # pragma: no cover
def _encode_evaled_factor(
self,
factor: EvaluatedFactor,
spec: ModelSpec,
drop_rows: set,
reduced_rank: bool = False,
) -> Dict[str, Any]:
if not factor.metadata.encoded:
if factor.expr in self.encoded_cache:
encoded = self.encoded_cache[factor.expr]
elif (factor.expr, reduced_rank) in self.encoded_cache:
encoded = self.encoded_cache[(factor.expr, reduced_rank)]
else:
def map_dict(f):
"""
This decorator allows an encoding function to operator on
dictionaries (which should be mapped over). This allows
transforms to output multiple non-encoded columns and still
have everything work as expected.
"""
@functools.wraps(f)
def wrapped(values, metadata, state, *args, **kwargs):
if isinstance(values, dict):
encoded = {}
for k, v in values.items():
if isinstance(k, str) and k.startswith("__"):
encoded[k] = v
else:
nested_state = state.get(k, {})
encoded[k] = wrapped(
v, metadata, nested_state, *args, **kwargs
)
if nested_state:
state[k] = nested_state
if isinstance(values, FactorValues):
return FactorValues(
encoded, metadata=values.__formulaic_metadata__
)
return encoded # pragma: no cover; nothing in formulaic uses this, but is here for generality.
return f(values, metadata, state, *args, **kwargs)
return wrapped
encoder_state = spec.encoder_state.get(factor.expr, [None, {}])[1]
if factor.metadata.encoder is not None:
encoded = as_columns(
factor.metadata.encoder(
factor.values,
reduced_rank=reduced_rank,
drop_rows=drop_rows,
encoder_state=encoder_state,
model_spec=spec,
)
)
else:
# If we need to unpack values into columns, we do this here.
# Otherwise, we pass through the original values.
factor_values = FactorValues(
self._extract_columns_for_encoding(factor),
metadata=factor.metadata,
)
if factor.metadata.kind is Factor.Kind.CATEGORICAL:
encoded = map_dict(self._encode_categorical)(
factor_values,
factor.metadata,
encoder_state,
spec,
drop_rows,
reduced_rank=reduced_rank,
)
elif factor.metadata.kind is Factor.Kind.NUMERICAL:
encoded = map_dict(self._encode_numerical)(
factor_values,
factor.metadata,
encoder_state,
spec,
drop_rows,
)
elif factor.metadata.kind is Factor.Kind.CONSTANT:
encoded = map_dict(self._encode_constant)(
factor_values,
factor.metadata,
encoder_state,
spec,
drop_rows,
)
else:
raise FactorEncodingError(
factor
) # pragma: no cover; it is not currently possible to reach this sentinel
spec.encoder_state[factor.expr] = (factor.metadata.kind, encoder_state)
# Only encode once for encodings where we can just drop a field
# later on below.
if isinstance(encoded, dict) and factor.metadata.drop_field:
cache_key = factor.expr
else:
cache_key = (factor.expr, reduced_rank)
self.encoded_cache[cache_key] = encoded
else:
encoded = as_columns(
factor.values
) # pragma: no cover; we don't use this in formulaic yet.
encoded = FactorValues(
encoded,
metadata=getattr(encoded, "__formulaic_metadata__", factor.metadata),
encoded=True,
)
# Encoded factors will now all be dicts
if (
isinstance(encoded, dict)
and encoded.__formulaic_metadata__.spans_intercept
and reduced_rank
):
encoded = FactorValues(
encoded.copy(), metadata=encoded.__formulaic_metadata__
)
del encoded[encoded.__formulaic_metadata__.drop_field]
return self._flatten_encoded_evaled_factor(factor.expr, encoded)
def _extract_columns_for_encoding(
self, factor: EvaluatedFactor
) -> Union[Any, Dict[str, Any]]:
"""
If incoming factor has values that need to be unpacked into columns
(e.g. a two-dimensions numpy array), do that expansion here. Otherwise,
return the current factor values.
"""
return as_columns(factor.values)
def _flatten_encoded_evaled_factor(
self, name: str, values: FactorValues[dict]
) -> Dict[str, Any]:
if not isinstance(values, dict):
return {name: values}
# Some nested dictionaries may not be a `FactorValues[dict]` instance,
# in which case we impute the default formatter in `FactorValues.format`.
if hasattr(values, "__formulaic_metadata__"):
name_format = values.__formulaic_metadata__.format
else:
name_format = FactorValuesMetadata.format
flattened = {}
for subfield, value in values.items():
if isinstance(subfield, str) and subfield.startswith("__"):
continue
subname = name_format.format(name=name, field=subfield)
if isinstance(value, dict):
flattened.update(self._flatten_encoded_evaled_factor(subname, value))
else:
flattened[subname] = value
return flattened
@abstractmethod
def _encode_constant(self, value, metadata, encoder_state, spec, drop_rows):
pass # pragma: no cover
@abstractmethod
def _encode_categorical(
self, values, metadata, encoder_state, spec, drop_rows, reduced_rank=False
):
pass # pragma: no cover
@abstractmethod
def _encode_numerical(self, values, metadata, encoder_state, spec, drop_rows):
pass # pragma: no cover
# Methods related to ModelMatrix output
def _enforce_structure(
self,
cols: List[Tuple[Term, List[ScopedTerm], Dict[str, Any]]],
spec,
drop_rows: set,
) -> Generator[Tuple[Term, List[ScopedTerm], Dict[str, Any]]]:
# TODO: Verify that imputation strategies are intuitive and make sense.
assert len(cols) == len(spec.structure)
for i, col_spec in enumerate(cols):
scoped_cols = col_spec[2]
target_cols = spec.structure[i][2]
if len(scoped_cols) > len(target_cols):
raise FactorEncodingError(
f"Term `{col_spec[0]}` has generated too many columns compared to specification: generated {list(scoped_cols)}, expecting {target_cols}."
)
if len(scoped_cols) < len(target_cols):
if len(scoped_cols) == 0:
col = self._encode_constant(0, None, None, spec, drop_rows)
elif len(scoped_cols) == 1:
col = tuple(scoped_cols.values())[0]
else:
raise FactorEncodingError(
f"Term `{col_spec[0]}` has generated insufficient columns compared to specification: generated {list(scoped_cols)}, expecting {target_cols}."
)
scoped_cols = {name: col for name in target_cols}
elif set(scoped_cols) != set(target_cols):
raise FactorEncodingError(
f"Term `{col_spec[0]}` has generated columns that are inconsistent with specification: generated {list(scoped_cols)}, expecting {target_cols}."
)
yield col_spec[0], col_spec[1], {
col: scoped_cols[col] for col in target_cols
}
def _get_columns_for_term(self, factors, spec, scale=1):
"""
Assemble the columns for a model matrix given factors and a scale.
This performs the row-wise Kronecker product of the factors.
Args:
factors
scale
Returns:
dict
"""
out = OrderedDict()
for product in itertools.product(*(factor.items() for factor in factors)):
out[":".join(p[0] for p in product)] = scale * functools.reduce(
operator.mul, (p[1] for p in product)
)
return out
@abstractmethod
def _combine_columns(self, cols, spec, drop_rows):
pass # pragma: no cover
'''
inliner_packages = {
"formulaic.model_spec": [
0, 2786, 18313, 1680704490],
"formulaic.model_matrix": [
0, 18313, 21383, 1680704490],
"formulaic.formula": [
0, 21383, 30338, 1680704490],
"formulaic": [
1, 30338, 31068, 1680704490],
"formulaic.utils.cast": [
0, 31068, 32979, 1680704490],
"formulaic.utils.layered_mapping": [
0, 32979, 35895, 1680704490],
"formulaic.utils.stateful_transforms": [
0, 35895, 45370, 1680704490],
"formulaic.utils": [
1, 45370, 45370, 1680704490],
"formulaic.utils.calculus": [
0, 45370, 49059, 1680704490],
"formulaic.utils.context": [
0, 49059, 50930, 1680704490],
"formulaic.utils.sentinels": [
0, 50930, 51358, 1680704490],
"formulaic.utils.iterators": [
0, 51358, 52375, 1680704490],
"formulaic.utils.sparse": [
0, 52375, 53973, 1680704490],
"formulaic.utils.constraints": [
0, 53973, 71006, 1680704490],
"formulaic.parser.types.formula_parser": [
0, 71006, 73839, 1680704490],
"formulaic.parser.types.token": [
0, 73839, 81401, 1680704490],
"formulaic.parser.types.term": [
0, 81401, 82879, 1680704490],
"formulaic.parser.types.operator_resolver": [
0, 82879, 86589, 1680704490],
"formulaic.parser.types.operator": [
0, 86589, 90603, 1680704490],
"formulaic.parser.types": [
1, 90603, 91013, 1680704490],
"formulaic.parser.types.structured": [
0, 91013, 108139, 1680704490],
"formulaic.parser.types.factor": [
0, 108139, 111151, 1680704490],
"formulaic.parser.types.ast_node": [
0, 111151, 114323, 1680704490],
"formulaic.parser": [
1, 114323, 114463, 1680704490],
"formulaic.parser.algos": [
1, 114463, 114587, 1680704490],
"formulaic.parser.algos.tokens_to_ast": [
0, 114587, 119813, 1680704490],
"formulaic.parser.algos.tokenize": [
0, 119813, 126298, 1680704490],
"formulaic.parser.parser": [
0, 126298, 135852, 1680704490],
"formulaic.parser.utils": [
0, 135852, 144413, 1680704490],
"formulaic.sugar": [
0, 144413, 146655, 1680704490],
"formulaic.transforms.contrasts": [
0, 146655, 171974, 1680704490],
"formulaic.transforms.basis_spline": [
0, 171974, 180136, 1680704490],
"formulaic.transforms": [
1, 180136, 180896, 1680704490],
"formulaic.transforms.poly": [
0, 180896, 185491, 1680704490],
"formulaic.transforms.identity": [
0, 185491, 185527, 1680704490],
"formulaic.transforms.scale": [
0, 185527, 186944, 1680704490],
"formulaic.errors": [
0, 186944, 187939, 1680704490],
"formulaic.materializers.types.enums": [
0, 187939, 188045, 1680704490],
"formulaic.materializers.types.scoped_term": [
0, 188045, 189164, 1680704490],
"formulaic.materializers.types.evaluated_factor": [
0, 189164, 190839, 1680704490],
"formulaic.materializers.types.scoped_factor": [
0, 190839, 191544, 1680704490],
"formulaic.materializers.types": [
1, 191544, 191846, 1680704490],
"formulaic.materializers.types.factor_values": [
0, 191846, 196067, 1680704490],
"formulaic.materializers.pandas": [
0, 196067, 202107, 1680704490],
"formulaic.materializers.arrow": [
0, 202107, 203045, 1680704490],
"formulaic.materializers": [
1, 203045, 203349, 1680704490],
"formulaic.materializers.base": [
0, 203349, 232736, 1680704490]
}
def prepare_package():
# Loader's module name changes with each major version to be able to have
# different loaders working at the same time.
module_name = PINLINER_MODULE_NAME + '_' + loader_version.split('.')[0]
# If the loader code is not already loaded we create a specific module for
# it. We need to do it this way so that the functions in there are not
# compiled with a reference to this module's global dictionary in
# __globals__.
module = sys.modules.get(module_name)
if not module:
module = types.ModuleType(module_name)
module.__package__ = ''
module.__file__ = module_name + '.py'
exec(inliner_importer_code, module.__dict__)
sys.modules[module_name] = module
# We cannot use __file__ directly because on the second run __file__ will
# be the compiled file (.pyc) and that's not the file we want to read.
filename = os.path.splitext(__file__)[0] + '.py'
# Add our own finder and loader for this specific package if it's not
# already there.
# This must be done before we initialize the package, as it may import
# packages and modules contained in the package itself.
for finder in sys.meta_path:
if (isinstance(finder, module.InlinerImporter) and
finder.data == inliner_packages):
importer = finder
else:
# If we haven't forced the setting of the uncaught exception handler
# we replace it only if it hasn't been replace yet, this is because
# CPython default handler does not use traceback or even linecache, so
# it never calls get_source method to get the code, but for example
# iPython does, so we don't need to replace the handler.
if FORCE_EXC_HOOK is None:
set_excepthook = sys.__excepthook__ == sys.excepthook
else:
set_excepthook = FORCE_EXC_HOOK
importer = module.InlinerImporter(inliner_packages, filename,
set_excepthook)
sys.meta_path.append(importer)
# If this is a bundle (multiple packages) without default then don't import
# any package automatically.
if not PINLINED_DEFAULT_PACKAGE:
return
__, start, end, ts = inliner_packages[PINLINED_DEFAULT_PACKAGE]
with open(filename) as datafile:
datafile.seek(start)
code = datafile.read(end - start)
# We need everything to be local variables before we clear the global dict
def_package = PINLINED_DEFAULT_PACKAGE
name = __name__
filename = def_package + '/__init__.py'
compiled_code = compile(code, filename, 'exec')
# Prepare globals to execute __init__ code
globals().clear()
# If we've been called directly we cannot set __path__
if name != '__main__':
globals()['__path__'] = [def_package]
else:
def_package = None
globals().update(__file__=filename,
__package__=def_package,
__name__=name,
__loader__=importer)
exec(compiled_code, globals())
# Prepare loader's module and populate this namespace only with package's
# __init__
prepare_package()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment