Created
April 5, 2023 14:30
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import types | |
PINLINED_DEFAULT_PACKAGE = 'formulaic' | |
PINLINER_MODULE_NAME = 'pinliner_loader' | |
loader_version = '0.2.1' | |
FORCE_EXC_HOOK = None | |
inliner_importer_code = ''' | |
import imp | |
import marshal | |
import os | |
import struct | |
import sys | |
import types | |
class InlinerImporter(object): | |
version = '%(loader_version)s' | |
def __init__(self, data, datafile, set_excepthook=True): | |
self.data = data | |
self.datafile = datafile | |
if set_excepthook: | |
sys.excepthook = self.excepthook | |
@staticmethod | |
def excepthook(type, value, traceback): | |
import traceback as tb | |
tb.print_exception(type, value, traceback) | |
def find_module(self, fullname, path): | |
module = fullname in self.data | |
if module: | |
return self | |
def get_source(self, fullname): | |
__, start, end, ts = self.data[fullname] | |
with open(self.datafile) as datafile: | |
datafile.seek(start) | |
code = datafile.read(end - start) | |
return code | |
def get_code(self, fullname, filename): | |
py_ts = self.data[fullname][3] | |
try: | |
with open(fullname + '.pyc', 'rb') as pyc: | |
pyc_magic = pyc.read(4) | |
pyc_ts = struct.unpack('<I', pyc.read(4))[0] | |
if pyc_magic == imp.get_magic() and pyc_ts == py_ts: | |
return marshal.load(pyc) | |
except: | |
pass | |
code = self.get_source(fullname) | |
compiled_code = compile(code, filename, 'exec') | |
try: | |
with open(fullname + '.pyc', 'wb') as pyc: | |
pyc.write(imp.get_magic()) | |
pyc.write(struct.pack('<I', py_ts)) | |
marshal.dump(compiled_code, pyc) | |
except: | |
pass | |
return compiled_code | |
def load_module(self, fullname): | |
# If the module it's already in there we'll reload but won't remove the | |
# entry if we fail | |
exists = fullname in sys.modules | |
module = types.ModuleType(fullname) | |
module.__loader__ = self | |
is_package = self.data[fullname][0] | |
path = fullname.replace('.', os.path.sep) | |
if is_package: | |
module.__package__ = fullname | |
module.__file__ = os.path.join(path, '__init__.py') | |
module.__path__ = [path] | |
else: | |
module.__package__ = fullname.rsplit('.', 1)[0] | |
module.__file__ = path + '.py' | |
sys.modules[fullname] = module | |
try: | |
compiled_code = self.get_code(fullname, module.__file__) | |
exec(compiled_code, module.__dict__) | |
except: | |
if not exists: | |
del sys.modules[fullname] | |
raise | |
return module | |
''' % {'loader_version': loader_version} | |
''' | |
from __future__ import annotations | |
import warnings | |
from collections import OrderedDict | |
from dataclasses import dataclass, field, replace | |
from typing import ( | |
Any, | |
Dict, | |
List, | |
Mapping, | |
Optional, | |
Sequence, | |
Tuple, | |
Union, | |
TYPE_CHECKING, | |
) | |
from formulaic.materializers.base import EncodedTermStructure | |
from formulaic.parser.types import Structured, Term | |
from formulaic.utils.constraints import LinearConstraintSpec, LinearConstraints | |
from .formula import Formula, FormulaSpec | |
from .materializers import FormulaMaterializer, NAAction | |
if TYPE_CHECKING: # pragma: no cover | |
from .model_matrix import ModelMatrices, ModelMatrix | |
# Cached property was introduced in Python 3.8 (we currently support 3.7) | |
try: | |
from functools import cached_property | |
except ImportError: # pragma: no cover | |
from cached_property import cached_property | |
@dataclass(frozen=True) | |
class ModelSpec: | |
""" | |
A container for the metadata used to generate a `ModelMatrix` instance. | |
This object can also be used to create a `ModelMatrix` instance that | |
respects the encoding choices made during the generation of this `ModelSpec` | |
instance. | |
Attributes: | |
Configuration: | |
formula: The formula for which the model matrix was (and/or will be) | |
generated. | |
materializer: The materializer used (and/or to be used) to | |
materialize the formula into a matrix. | |
ensure_full_rank: Whether to ensure that the generated matrix is | |
"structurally" full-rank (features are not included which are | |
known to violate full-rankness). | |
na_action: The action to be taken if NA values are found in the | |
data. Can be on of: "drop" (the default), "raise" or "ignore". | |
output: The desired output type (as interpreted by the materializer; | |
e.g. "pandas", "sparse", etc). | |
State (these attributes are only populated during materialization): | |
structure: The model matrix structure resulting from materialization. | |
transform_state: The state of any stateful transformations that took | |
place during factor evaluation. | |
encoder_state: The state of any stateful transformations that took | |
place during encoding. | |
""" | |
@classmethod | |
def from_spec( | |
cls, | |
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs], | |
**attrs, | |
) -> Union[ModelSpec, ModelSpecs]: | |
""" | |
Construct a `ModelSpec` (or `Structured[ModelSpec]`) instance for the | |
nominated `spec`, setting and/or overriding any `ModelSpec` attributes | |
present in `attrs`. | |
Args: | |
spec: The specification for which to generate a `ModelSpec` | |
instance or structured set of `ModelSpec` instances. | |
attrs: Any `ModelSpec` attributes to set and/or override on all | |
generated `ModelSpec` instances. | |
""" | |
from .model_matrix import ModelMatrix | |
def prepare_model_spec(obj): | |
if isinstance(obj, ModelMatrix): | |
obj = obj.model_spec | |
if isinstance(obj, ModelSpec): | |
return obj.update(**attrs) | |
formula = Formula.from_spec(obj) | |
if not formula._has_root or formula._has_structure: | |
return formula._map(prepare_model_spec, as_type=ModelSpecs) | |
return ModelSpec(formula=formula, **attrs) | |
if isinstance(spec, Formula) or not isinstance(spec, Structured): | |
return prepare_model_spec(spec) | |
return spec._map(prepare_model_spec, as_type=ModelSpecs) | |
# Configuration attributes | |
formula: Formula | |
materializer: Optional[str] = None | |
materializer_params: Optional[Dict[str, Any]] = None | |
ensure_full_rank: bool = True | |
na_action: NAAction = "drop" | |
output: Optional[str] = None | |
# State attributes | |
structure: Optional[List[EncodedTermStructure]] = None | |
transform_state: Dict = field(default_factory=dict) | |
encoder_state: Dict = field(default_factory=dict) | |
def __post_init__(self): | |
self.__dict__["formula"] = Formula.from_spec(self.formula) | |
if not self.formula._has_root or self.formula._has_structure: | |
raise ValueError( | |
"Nominated `Formula` instance has structure, which is not permitted when attaching to a `ModelSpec` instance." | |
) | |
# Materializer | |
if self.materializer is not None and not isinstance(self.materializer, str): | |
self.__dict__["materializer"] = FormulaMaterializer.for_materializer( | |
self.materializer | |
).REGISTER_NAME | |
self.__dict__["na_action"] = NAAction(self.na_action) | |
# Derived features | |
@cached_property | |
def column_names(self) -> Sequence[str]: | |
""" | |
The names associated with the columns of the generated model matrix. | |
""" | |
return tuple(feature for row in self.structure for feature in row.columns) | |
@property | |
def feature_names(self) -> Sequence[str]: | |
""" | |
A deprecated reference to `ModelSpec.column_names`. Will be removed in | |
v1.0.0. | |
""" | |
warnings.warn( | |
"`ModelSpec.feature_names` is deprecated and will be removed in v1.0.0. Use `ModelSpec.column_names` instead.", | |
DeprecationWarning, | |
) | |
return self.column_names | |
@cached_property | |
def column_indices(self) -> OrderedDict[str, int]: | |
""" | |
An ordered mapping from column names to the column index in generated | |
model matrices. | |
""" | |
return OrderedDict([(name, i) for i, name in enumerate(self.column_names)]) | |
@property | |
def feature_indices(self) -> Sequence[str]: | |
""" | |
A deprecated reference to `ModelSpec.column_indices`. Will be removed in | |
v1.0.0. | |
""" | |
warnings.warn( | |
"`ModelSpec.feature_indices` is deprecated and will be removed in v1.0.0. Use `ModelSpec.column_indices` instead.", | |
DeprecationWarning, | |
) | |
return self.column_indices | |
@property | |
def terms(self) -> List[Term]: | |
""" | |
The terms used to generate model matrices from this `ModelSpec` | |
instance. | |
""" | |
return self.formula.root | |
@cached_property | |
def term_indices(self) -> OrderedDict[Term, Tuple[int, ...]]: | |
""" | |
An ordered mapping of `Term` instances to the generated column indices. | |
Note: Since terms hash using their string representation, you can look | |
up elements of this mapping using the string representation of the | |
`Term`. | |
""" | |
slices = OrderedDict() | |
start = 0 | |
for row in self.structure: | |
end = start + len(row[2]) | |
slices[row[0]] = tuple(range(start, end)) | |
start = end | |
return slices | |
@cached_property | |
def term_slices(self) -> OrderedDict[Term, slice]: | |
""" | |
An ordered mapping of `Term` instances to a slice that when used on | |
the columns of the model matrix will subsample the model matrix down to | |
those corresponding to each term. | |
Note: Since terms hash using their string representation, you can look | |
up elements of this mapping using the string representation of the | |
`Term`. | |
""" | |
return OrderedDict( | |
{k: slice(v[0], v[-1] + 1) for k, v in self.term_indices.items()} | |
) | |
# Transforms | |
def update(self, **kwargs): | |
""" | |
Create a copy of this `ModelSpec` instance with the nominated attributes | |
mutated. | |
""" | |
return replace(self, **kwargs) | |
def differentiate( | |
self, *vars, use_sympy=False # pylint: disable=redefined-builtin | |
): | |
""" | |
EXPERIMENTAL: Take the gradient of this model spec. When used a linear | |
regression, evaluating a trained model on model matrices generated by | |
this formula is equivalent to estimating the gradient of that fitted | |
form with respect to `vars`. | |
Args: | |
vars: The variables with respect to which the gradient should be | |
taken. | |
use_sympy: Whether to use sympy to perform symbolic differentiation. | |
Notes: | |
This method is provisional and may be removed in any future major | |
version. | |
""" | |
return self.update( | |
formula=self.formula.differentiate(*vars, use_sympy=use_sympy), | |
) | |
# Utility methods | |
def get_model_matrix( | |
self, data: Any, context: Optional[Mapping[str, Any]] = None, **attr_overrides | |
) -> ModelMatrix: | |
""" | |
Build the model matrix (or matrices) realisation of this model spec for | |
the nominated `data`. | |
Args: | |
data: The data for which to build the model matrices. | |
context: An additional mapping object of names to make available in | |
when evaluating formula term factors. | |
attr_overrides: Any `ModelSpec` attributes to override before | |
constructing model matrices. This is shorthand for first | |
running `ModelSpec.update(**attr_overrides)`. | |
""" | |
if attr_overrides: | |
return self.update(**attr_overrides).get_model_matrix(data, context=context) | |
if self.materializer is None: | |
materializer = FormulaMaterializer.for_data(data) | |
else: | |
materializer = FormulaMaterializer.for_materializer(self.materializer) | |
return materializer( | |
data, context=context, **(self.materializer_params or {}) | |
).get_model_matrix(self) | |
def get_linear_constraints(self, spec: LinearConstraintSpec) -> LinearConstraints: | |
""" | |
Construct a `LinearConstraints` instance from a specification based on | |
the structure of the model matrices associated with this model spec. | |
Args: | |
spec: The specification from which to derive the constraints. Refer | |
to `LinearConstraints.from_spec` for more details. | |
""" | |
return LinearConstraints.from_spec(spec, variable_names=self.column_names) | |
def get_slice(self, columns_identifier: Union[int, str, Term, slice]) -> slice: | |
""" | |
Generate a `slice` instance corresponding to the columns associated with | |
the nominated `columns_identifier`. | |
Args: | |
columns_identifier: The identifier for which the slice should be | |
generated. Can be one of: | |
- an integer specifying a specific column index. | |
- a `Term` instance | |
- a string representation of a term | |
- a column name | |
""" | |
if isinstance(columns_identifier, slice): | |
return columns_identifier | |
if isinstance(columns_identifier, int): | |
return slice(columns_identifier, columns_identifier + 1) | |
term_slices = self.term_slices | |
if isinstance(columns_identifier, Term): | |
if columns_identifier not in term_slices: | |
raise ValueError( | |
f"Model matrices built using this spec do not include term: `{columns_identifier}`." | |
) | |
return term_slices[columns_identifier] | |
if columns_identifier in term_slices: | |
return term_slices[columns_identifier] | |
column_indices = self.column_indices | |
if columns_identifier in column_indices: | |
idx = column_indices[columns_identifier] | |
return slice(idx, idx + 1) | |
raise ValueError( | |
f"Model matrices built using this spec do not have any columns related to: `{repr(columns_identifier)}`." | |
) | |
# Only include dataclass fields when pickling. | |
def __getstate__(self): | |
return { | |
k: v for k, v in self.__dict__.items() if k in self.__dataclass_fields__ | |
} | |
class ModelSpecs(Structured[ModelSpec]): | |
""" | |
A `Structured[ModelSpec]` subclass that exposes some convenience methods | |
that should be mapped onto all contained `ModelSpec` instances. | |
""" | |
def _prepare_item(self, key: str, item: Any) -> Any: | |
# Verify that all included items are `ModelSpec` instances. | |
if not isinstance(item, ModelSpec): | |
raise TypeError( | |
"`ModelSpecs` instances expect all items to be instances of " | |
f"`ModelSpec`. [Got: {repr(item)} of type {repr(type(item))} " | |
f"for key {repr(key)}." | |
) | |
return item | |
def get_model_matrix( | |
self, data: Any, context: Optional[Mapping[str, Any]] = None, **attr_overrides | |
) -> ModelMatrices: | |
""" | |
This method proxies the `ModelSpec.get_model_matrix(...)` API and allows | |
it to be called on a structured set of `ModelSpec` instances. If all | |
`ModelSpec.materializer` and `ModelSpec.materializer_params` values are | |
unset or the same, then they are jointly evaluated allowing re-use of | |
the same cached across the specs. | |
Args: | |
data: The data for which to build the model matrices. | |
context: An additional mapping object of names to make available in | |
when evaluating formula term factors. | |
attr_overrides: Any `ModelSpec` attributes to override before | |
constructing model matrices. This is shorthand for first | |
running `ModelSpec.from_spec(model_specs, **attr_overrides)`. | |
""" | |
from formulaic import ModelMatrices | |
if attr_overrides: | |
return ModelSpec.from_spec(self, **attr_overrides).get_model_matrix( | |
data, context=context | |
) | |
# Check whether we can generate model matrices jointly (i.e. all | |
# materializers and their params are the same) | |
jointly_generate = False | |
materializer, materializer_params = None, None | |
for spec in self._flatten(): | |
if not spec.materializer: | |
continue | |
if materializer not in ( | |
None, | |
spec.materializer, | |
) or materializer_params not in ( | |
None, | |
spec.materializer_params, | |
): | |
break | |
materializer, materializer_params = ( | |
spec.materializer, | |
spec.materializer_params or None, | |
) | |
else: | |
jointly_generate = True | |
if jointly_generate: | |
if materializer is None: | |
materializer = FormulaMaterializer.for_data(data) | |
else: | |
materializer = FormulaMaterializer.for_materializer(materializer) | |
return materializer( | |
data, context=context, **(materializer_params or {}) | |
).get_model_matrix(self) | |
return self._map( | |
lambda model_spec: model_spec.get_model_matrix(data, context=context), | |
as_type=ModelMatrices, | |
) | |
def differentiate( | |
self, *vars, use_sympy=False # pylint: disable=redefined-builtin | |
) -> ModelSpecs: | |
""" | |
This method proxies the experimental `ModelSpec.differentiate(...)` API. | |
See `ModelSpec.differentiate` for more details. | |
""" | |
return self._map( | |
lambda model_spec: model_spec.differentiate(*vars, use_sympy=use_sympy), | |
as_type=ModelSpecs, | |
) | |
from __future__ import annotations | |
import copy | |
from typing import Any, Generic, Optional, TypeVar, TYPE_CHECKING | |
import wrapt | |
from formulaic.parser.types.structured import Structured | |
if TYPE_CHECKING: # pragma: no cover | |
from .model_spec import ModelSpec, ModelSpecs | |
MatrixType = TypeVar("MatrixType") | |
class ModelMatrix(Generic[MatrixType], wrapt.ObjectProxy): | |
""" | |
A wrapper around arbitrary model matrix output representations. | |
This wrapper allows for `isinstance(..., ModelMatrix)` checks, and allows | |
one to access the `ModelSpec` instance associated with its creation using | |
`<model_matrix>.model_spec`. All other instance attributes and methods of | |
the wrapped object are directly accessible as if the object were unwrapped. | |
""" | |
def __init__(self, matrix: Any, spec: Optional[ModelSpec] = None): | |
wrapt.ObjectProxy.__init__(self, matrix) | |
self._self_model_spec = spec | |
@property | |
def model_spec(self) -> Optional[ModelSpec]: | |
""" | |
The `ModelSpec` instance associated with the creation of this | |
`ModelMatrix` instance. | |
This `ModelSpec` instance can be used to create other `ModelMatrix`s | |
that respect all the choices (including feature selection and encoding) | |
that were made in the construction of this `ModelMatrix` instance. | |
""" | |
return self._self_model_spec | |
def __repr__(self): | |
return self.__wrapped__.__repr__() # pragma: no cover | |
# Handle copying behaviour | |
def __copy__(self): | |
return type(self)(copy.copy(self.__wrapped__), spec=self._self_model_spec) | |
def __deepcopy__(self, memo=None): | |
return type(self)( | |
copy.deepcopy(self.__wrapped__, memo), | |
spec=copy.deepcopy(self._self_model_spec), | |
) | |
class ModelMatrices(Structured[ModelMatrix]): | |
""" | |
A `Structured[ModelMatrix]` subclass that adds a `.model_spec` attribute | |
(mirrorin `ModelMatrix.model_spec`) that returns a structured container for | |
all the `ModelSpec` instances associated with the `ModelSpec` objects | |
referenced by this container. | |
""" | |
def _prepare_item( | |
self, key: str, item: Any | |
) -> Any: # Verify that all included items are `ModelSpec` instances. | |
# Verify that all included items are `ModelMatrix` instances. | |
if not isinstance(item, ModelMatrix): | |
raise TypeError( | |
"`ModelMatrices` instances expect all items to be instances " | |
f"of `ModelMatrix`. [Got: {repr(item)} of type " | |
f"{repr(type(item))} for key {repr(key)}." | |
) | |
return item | |
@property | |
def model_spec(self) -> ModelSpecs: | |
""" | |
The `ModelSpecs` instance representing the structured set of `ModelSpec` | |
instances associated with the `ModelMatrix` instances stored in this | |
`Structured` instance. | |
""" | |
from .model_spec import ModelSpecs | |
return self._map( | |
lambda model_matrix: model_matrix.model_spec, as_type=ModelSpecs | |
) | |
from __future__ import annotations | |
import warnings | |
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union | |
from typing_extensions import TypeAlias | |
from .errors import FormulaInvalidError | |
from .model_matrix import ModelMatrix | |
from .parser import DefaultFormulaParser | |
from .parser.types import FormulaParser, Structured, Term | |
from .utils.calculus import differentiate_term | |
FormulaSpec: TypeAlias = Union[ | |
str, | |
List[Union[str, Term]], | |
Set[Union[str, Term]], | |
Structured[Union[str, List[Term], Set[Term]]], | |
"Formula", # Direct formula specification | |
Dict[str, "FormulaSpec"], | |
Tuple["FormulaSpec", ...], # Structured formulae | |
] | |
class Formula(Structured[List[Term]]): | |
""" | |
A Formula is a (potentially structured) list of terms, which is represented | |
by this class. | |
This is a thin wrapper around `Strucuted[List[Term]]` that adds convenience | |
methods for building model matrices from the formula (among other common | |
tasks). You can build a `Formula` instance by passing in a string for | |
parsing, or by manually assembling the terms yourself. | |
Examples: | |
``` | |
>>> Formula("y ~ x") | |
.lhs: | |
y | |
.rhs: | |
1 + x | |
>>> Formula("x + y", a=["x", "y:z"], b="y ~ z") | |
root: | |
1 + x + y | |
.a: | |
x + y:z | |
.b: | |
.lhs: | |
y | |
.rhs: | |
z | |
``` | |
You can control how strings are parsed into terms by passing in custom | |
parsers via `_parser` and `_nested_parser`. | |
``` | |
>>> Formula("y ~ x", _parser=DefaultFormulaParser(include_intercept=False)) | |
.lhs: | |
y | |
.rhs: | |
x | |
``` | |
Attributes: | |
_parser: The `FormulaParser` instance to use when parsing complete | |
formulae (vs. individual terms). If not specified, | |
`DefaultFormulaParser()` is used. | |
_nested_parser: The `FormulaParser` instance to use when parsing | |
strings describing nested or individual terms (e.g. when `spec` is a | |
list of string term identifiers). If not specified and `_parser` is | |
specified, `_parser` is used; if `_parser` is not specified, | |
`DefaultFormulaParser(include_intercept=False)` is used instead. | |
""" | |
DEFAULT_PARSER = DefaultFormulaParser() | |
DEFAULT_NESTED_PARSER = DefaultFormulaParser(include_intercept=False) | |
__slots__ = ("_parser", "_nested_parser") | |
@classmethod | |
def from_spec( | |
cls, | |
spec: FormulaSpec, | |
parser: Optional[FormulaParser] = None, | |
nested_parser: Optional[FormulaParser] = None, | |
) -> Formula: | |
""" | |
Construct a `Formula` instance from a formula specification. | |
Args: | |
spec: The formula specification. | |
parser: The `FormulaParser` instance to use when parsing complete | |
formulae (vs. individual terms). If not specified, | |
`DefaultFormulaParser()` is used. | |
nested_parser: The `FormulaParser` instance to use when parsing | |
strings describing nested or individual terms (e.g. when `spec` | |
is a list of string term identifiers). If not specified and | |
`parser` is specified, `parser` is used; if `parser` is not | |
specified, `DefaultFormulaParser(include_intercept=False)` is | |
used instead. | |
""" | |
if isinstance(spec, Formula): | |
return spec | |
return Formula(spec, _parser=parser, _nested_parser=nested_parser) | |
def __init__( | |
self, | |
*args, | |
_parser: Optional[FormulaParser] = None, | |
_nested_parser: Optional[FormulaParser] = None, | |
**kwargs, | |
): | |
self._parser = _parser or self.DEFAULT_PARSER | |
self._nested_parser = _nested_parser or _parser or self.DEFAULT_NESTED_PARSER | |
super().__init__(*args, **kwargs) | |
self._simplify(unwrap=False, inplace=True) | |
def _prepare_item(self, key: str, item: FormulaSpec) -> Union[List[Term], Formula]: | |
""" | |
Convert incoming formula items into either a list of Terms or a nested | |
`Formula` instance. | |
Note: Where parsing of strings is required, the nested-parser is used | |
except for the root element of the parent formula. | |
Args: | |
key: The structural key where the item will be stored. | |
item: The specification to convert. | |
""" | |
if isinstance(item, str): | |
item = ( | |
(self._parser if key == "root" else self._nested_parser) | |
.get_terms(item, sort=True) | |
._simplify() | |
) | |
if isinstance(item, Structured): | |
formula_or_terms = Formula( | |
_parser=self._nested_parser, **item._structure | |
)._simplify() | |
elif isinstance(item, (list, set)): | |
formula_or_terms = [ | |
term | |
for value in item | |
for term in ( | |
self._nested_parser.get_terms(value) | |
if isinstance(value, str) | |
else [value] | |
) | |
] | |
self.__validate_terms(formula_or_terms) | |
formula_or_terms = sorted(formula_or_terms) | |
else: | |
raise FormulaInvalidError( | |
f"Unrecognized formula specification: {repr(item)}." | |
) | |
return formula_or_terms | |
@classmethod | |
def __validate_terms(cls, formula_or_terms: Any): | |
""" | |
Verify that all terms are of the appropriate type. The acceptable types | |
are: | |
- List[Terms] | |
- Tuple[List[Terms], ...] | |
- Formula | |
""" | |
if not isinstance(formula_or_terms, list): | |
# Should be impossible to reach this; here as a sentinel | |
raise FormulaInvalidError( | |
f"All components of a formula should be lists of `Term` instances. Found: {repr(formula_or_terms)}." | |
) | |
for term in formula_or_terms: | |
if not isinstance(term, Term): | |
raise FormulaInvalidError( | |
f"All terms in formula should be instances of `formulaic.parser.types.Term`; received term {repr(term)} of type `{type(term)}`." | |
) | |
def get_model_matrix( | |
self, data: Any, context: Optional[Mapping[str, Any]] = None, **spec_overrides | |
) -> Union[ModelMatrix, Structured[ModelMatrix]]: | |
""" | |
Build the model matrix (or matrices) realisation of this formula for the | |
nominated `data`. | |
Args: | |
data: The data for which to build the model matrices. | |
context: An additional mapping object of names to make available in | |
when evaluating formula term factors. | |
spec_overrides: Any `ModelSpec` attributes to set/override. See | |
`ModelSpec` for more details. | |
""" | |
from .model_spec import ModelSpec | |
return ModelSpec.from_spec(self, **spec_overrides).get_model_matrix( | |
data, context=context | |
) | |
def differentiate( # pylint: disable=redefined-builtin | |
self, | |
*vars: Tuple[str, ...], | |
use_sympy: bool = False, | |
) -> Formula: | |
""" | |
EXPERIMENTAL: Take the gradient of this formula. When used a linear | |
regression, evaluating a trained model on model matrices generated by | |
this formula is equivalent to estimating the gradient of that fitted | |
form with respect to `vars`. | |
Args: | |
vars: The variables with respect to which the gradient should be | |
taken. | |
use_sympy: Whether to use sympy to perform symbolic differentiation. | |
Notes: | |
This method is provisional and may be removed in any future major | |
version. | |
""" | |
return self._map( | |
lambda terms: [ | |
differentiate_term(term, vars, use_sympy=use_sympy) for term in terms | |
] | |
) | |
@property | |
def terms(self) -> Formula: | |
warnings.warn( | |
"`Formula.terms` is deprecated. Please index/iterate over `Formula` directly instead.", | |
DeprecationWarning, | |
) | |
return self | |
def __getattr__(self, attr): | |
# Keep substructures wrapped to retain access to helper functions. | |
subformula = super().__getattr__(attr) | |
if attr != "root": | |
return Formula.from_spec(subformula) | |
return subformula | |
def __getitem__(self, key): | |
# Keep substructures wrapped to retain access to helper functions. | |
subformula = super().__getitem__(key) | |
if key != "root": | |
return Formula.from_spec(subformula) | |
return subformula | |
def __repr__(self, to_str: bool = False): | |
if not self._has_structure and self._has_root: | |
return " + ".join([str(t) for t in self]) | |
return str(self._map(lambda terms: " + ".join([str(t) for t in terms]))) | |
from .formula import Formula, FormulaSpec | |
from .materializers import FactorValues | |
from .model_matrix import ModelMatrix, ModelMatrices | |
from .model_spec import ModelSpec, ModelSpecs | |
from .sugar import model_matrix | |
try: | |
from ._version import __version__, __version_tuple__ | |
except ImportError: # pragma: no cover | |
__version__ = version = "unknown" | |
__version_tuple__ = version_tuple = ("unknown",) | |
__author__ = "Matthew Wardrop" | |
__author_email__ = "mpwardrop@gmail.com" | |
__all__ = [ | |
"__author__", | |
"__author_email__", | |
"__version__", | |
"__version_tuple__", | |
"Formula", | |
"FormulaSpec", | |
"ModelMatrix", | |
"ModelMatrices", | |
"ModelSpec", | |
"ModelSpecs", | |
"model_matrix", | |
"FactorValues", | |
] | |
from functools import singledispatch, wraps | |
from typing import Any | |
import numpy | |
import pandas | |
import scipy.sparse | |
from formulaic.materializers.types.factor_values import FactorValues | |
def propagate_metadata(func): | |
@wraps(func) | |
def wrapper(data, *args, **kwargs): | |
evaluated = func(data, *args, **kwargs) | |
if isinstance(data, FactorValues): | |
return FactorValues( | |
evaluated, | |
metadata=data.__formulaic_metadata__, | |
) | |
return evaluated | |
return wrapper | |
@singledispatch | |
@propagate_metadata | |
def as_columns(data: Any) -> Any: | |
""" | |
Get the columns for `data`. If `data` represents a single column, or is a | |
dictionary (the format used to store columns), it is returned as is. | |
""" | |
return data | |
@as_columns.register | |
@propagate_metadata | |
def _(data: pandas.DataFrame): | |
return dict(data.items()) | |
@as_columns.register | |
@propagate_metadata | |
def _(data: numpy.ndarray): | |
if len(data.shape) == 1: | |
return data | |
if len(data.shape) > 2: | |
raise ValueError( | |
"Formulaic does not know how to convert numpy arrays with more than " | |
"two dimensions into columns." | |
) | |
if ( | |
hasattr(data, "__formulaic_metadata__") | |
and data.__formulaic_metadata__.column_names | |
): | |
column_names = data.__formulaic_metadata__.column_names | |
else: | |
column_names = list(range(data.shape[1])) | |
return {column_names[i]: data[:, i] for i in range(data.shape[1])} | |
@as_columns.register | |
@propagate_metadata | |
def _(data: scipy.sparse.csc_matrix): | |
if ( | |
hasattr(data, "__formulaic_metadata__") | |
and data.__formulaic_metadata__.column_names | |
): | |
column_names = data.__formulaic_metadata__.column_names | |
else: | |
column_names = list(range(data.shape[1])) | |
return {column_names[i]: data[:, i] for i in range(data.shape[1])} | |
import itertools | |
from collections.abc import MutableMapping | |
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple | |
class LayeredMapping(MutableMapping): | |
""" | |
A mutable mapping implementation that allows you to stack multiple mappings | |
on top of one another, passing key lookups through the stack from top to | |
bottom until the key is found or the stack is exhausted. Mutations are | |
stored in an additional layer local only to the `LayeredMapping` instance, | |
and the layers passed in are never mutated. | |
""" | |
def __init__(self, *layers: Tuple[Optional[Mapping]]): | |
""" | |
Crepare a `LayeredMapping` instance, populating it with the nominated | |
layers. | |
""" | |
self.mutations: Dict = {} | |
self.layers: List[Mapping] = self.__filter_layers(layers) | |
@staticmethod | |
def __filter_layers(layers: Iterable[Mapping]) -> List[Mapping]: | |
""" | |
Filter incoming `layers` down to those which are not null. | |
""" | |
return [layer for layer in layers if layer is not None] | |
def __getitem__(self, key: Any) -> Any: | |
for layer in [self.mutations, *self.layers]: | |
if key in layer: | |
return layer[key] | |
raise KeyError(key) | |
def __setitem__(self, key: Any, value: Any): | |
self.mutations[key] = value | |
def __delitem__(self, key: Any): | |
if key in self.mutations: | |
del self.mutations[key] | |
else: | |
raise KeyError(f"Key '{key}' not found in mutable layer.") | |
def __iter__(self): | |
keys = set() | |
for layer in [self.mutations, *self.layers]: | |
for key in layer: | |
if key not in keys: | |
keys.add(key) | |
yield key | |
def __len__(self): | |
return len(set(itertools.chain(self.mutations, *self.layers))) | |
def with_layers( | |
self, | |
*layers: Tuple[Optional[Mapping]], | |
prepend: bool = True, | |
inplace: bool = False, | |
) -> "LayeredMapping": | |
""" | |
Return a copy of this `LayeredMapping` instance with additional layers | |
added. | |
Args: | |
layers: The layers to add. | |
prepend: Whether to add the layers before (if `True`) or after (if | |
`False`) the current layers. | |
inplace: Whether to mutate the existing `LayeredMapping` instance | |
instead of returning a copy. | |
Returns: | |
A reference to the `LayeredMapping` instance with the extra layers. | |
""" | |
layers = self.__filter_layers(layers) | |
if not layers: | |
return self | |
if inplace: | |
self.layers = ( | |
[*layers, *self.layers] if prepend else [*self.layers, *layers] | |
) | |
return self | |
new_layers = [*layers, self] if prepend else [self, *layers] | |
return LayeredMapping(*new_layers) | |
import ast | |
import functools | |
import inspect | |
import keyword | |
import re | |
from typing import Any, Callable, Mapping, MutableMapping, Optional, TYPE_CHECKING | |
import astor | |
import numpy | |
from .iterators import peekable_iter | |
from .layered_mapping import LayeredMapping | |
if TYPE_CHECKING: | |
from formulaic.model_spec import ModelSpec # pragma: no cover | |
def stateful_transform(func: Callable) -> Callable: | |
""" | |
Transform a callable object into a stateful transform. | |
This is done by adding special arguments to the callable's signature: | |
- _state: The existing state or an empty dictionary. | |
- _metadata: Any extra metadata passed about the factor being evaluated. | |
- _spec: The `ModelSpec` instance being evaluated (or an empty `ModelSpec`). | |
If the callable has any of these in its signature, these will be passed onto | |
it; otherwise, they will be swallowed by the stateful transform wrapper. | |
Stateful transforms are also transformed into single dispatches, allowing | |
different implementations for incoming data types. | |
Args: | |
func: The function (or other callable) to be made into a stateful | |
transform. | |
Returns: | |
The stateful transform callable. | |
""" | |
func = functools.singledispatch(func) | |
params = inspect.signature(func).parameters.keys() | |
@functools.wraps(func) | |
def wrapper(data, *args, _metadata=None, _state=None, _spec=None, **kwargs): | |
from formulaic.model_spec import ModelSpec | |
_state = {} if _state is None else _state | |
extra_params = {} | |
if "_metadata" in params: | |
extra_params["_metadata"] = _metadata | |
if "_spec" in params: | |
extra_params["_spec"] = _spec or ModelSpec(formula=[]) | |
if isinstance(data, dict): | |
results = {} | |
for key, datum in data.items(): | |
if isinstance(key, str) and key.startswith("__"): | |
results[key] = datum | |
else: | |
statum = _state.get(key, {}) | |
results[key] = wrapper( | |
datum, *args, _state=statum, **extra_params, **kwargs | |
) | |
if statum: | |
_state[key] = statum | |
return results | |
return func(data, *args, _state=_state, **extra_params, **kwargs) | |
wrapper.__is_stateful_transform__ = True | |
return wrapper | |
def stateful_eval( | |
expr: str, | |
env: Optional[Mapping], | |
metadata: Optional[Mapping], | |
state: Optional[Mapping], | |
spec: Optional["ModelSpec"], | |
) -> Any: | |
""" | |
Evaluate an expression in a nominated environment and with a nominated state. | |
Under the hood this calls out to `eval`, and so if incoming expressions are | |
not safe, you should make sure that your `env` is properly isolated from | |
potentially unsafe methods and/or sys-calls. | |
Args: | |
expr: The expression to be evaluated. | |
env: The environment in which the expression is to be evaluated. This | |
environment is the only environment from which variables can be | |
looked up during the evaluation. | |
metadata: Additional metadata about the expression (passed through to | |
stateful transforms). | |
state: The current state of any stateful transforms (passed through to | |
stateful transforms). | |
spec: The current `ModelSpec` instance being evaluated (passed through | |
to stateful transforms). | |
Returns: | |
The result of the evaluation. | |
Notes: | |
- The state mapping is likely to be mutated in-place when using stateful | |
transforms. If you need to retain the original state, copy it | |
*before* calling this method. | |
""" | |
metadata = {} if metadata is None else metadata | |
state = {} if state is None else state | |
env = LayeredMapping( | |
env | |
) # We sometimes mutate env, so we make sure we do so in a local mutable layer. | |
# Ensure that variable names in code are valid for Python's interpreter | |
# If not, create new variable in mutable env layer, and update code. | |
expr = sanitize_variable_names(expr, env) | |
# Parse Python code | |
code = ast.parse(expr, mode="eval") | |
# Extract the nodes of the graph that correspond to stateful transforms | |
stateful_nodes = {} | |
for node in ast.walk(code): | |
if _is_stateful_transform(node, env): | |
stateful_nodes[astor.to_source(node).strip().replace("\n ", "")] = node | |
# Mutate stateful nodes to pass in state from a shared dictionary. | |
for name, node in stateful_nodes.items(): | |
name = name.replace('"', r'\\\\"') | |
if name not in state: | |
state[name] = {} | |
node.keywords.append( | |
ast.keyword( | |
"_metadata", | |
ast.parse(f'__FORMULAIC_METADATA__.get("{name}")', mode="eval").body, | |
) | |
) | |
node.keywords.append( | |
ast.keyword( | |
"_state", ast.parse(f'__FORMULAIC_STATE__["{name}"]', mode="eval").body | |
) | |
) | |
node.keywords.append( | |
ast.keyword("_spec", ast.parse("__FORMULAIC_SPEC__", mode="eval").body) | |
) | |
# Compile mutated AST | |
code = compile(ast.fix_missing_locations(code), "", "eval") | |
assert "__FORMULAIC_METADATA__" not in env | |
assert "__FORMULAIC_STATE__" not in env | |
assert "__FORMULAIC_SPEC__" not in env | |
# Evaluate and return | |
return eval( | |
code, | |
{}, | |
LayeredMapping( | |
{ | |
"__FORMULAIC_METADATA__": metadata, | |
"__FORMULAIC_SPEC__": spec, | |
"__FORMULAIC_STATE__": state, | |
}, | |
env, | |
), | |
) # nosec | |
def _is_stateful_transform(node: ast.AST, env: Mapping) -> bool: | |
""" | |
Check whether a given ast.Call node enacts a stateful transform given | |
the available symbols in `env`. | |
Args: | |
node: The AST node in question. | |
env: The current environment in which the node is evaluated. This is | |
used to look up the function handle so it can be inspected. | |
Return: | |
`True` if the node is a call node and the callable associated with the | |
node is a stateful transform. `False` otherwise. | |
""" | |
if not isinstance(node, ast.Call): | |
return False | |
try: | |
func = eval( | |
compile(astor.to_source(node.func).strip(), "", "eval"), {}, env | |
) # nosec; Get function handle (assuming it exists in env) | |
return getattr(func, "__is_stateful_transform__", False) | |
except NameError: | |
return False | |
# Variable sanitization | |
UNQUOTED_BACKTICK_MATCHER = re.compile( | |
r"(\\\"|\"(?:\\\"|[^\"])*\"|\\'|'(?:\\'|[^'])*'|`)" | |
) | |
def sanitize_variable_names(expr: str, env: Mapping) -> str: | |
""" | |
Sanitize any variables names in the expression that are not valid Python | |
identifiers and are surrounded by backticks (`). This allows use of field | |
names that are not valid Python names. | |
This function transforms `expr` into a new expression where identifiers that | |
would cause `SyntaxError`s are transformed into valid Python identifiers. | |
E.g. "func(`1a`)" -> "func(_1a)". `env` is updated to reflect the mapping of | |
the old identifier to the new one, provided that the original variable name | |
was already present. | |
Args: | |
expr: The expression to sanitize. | |
env: The environment to keep updated with any name substitutions. This | |
environment mapping will be mutated in place during this evaluation. | |
Returns: | |
The sanitized expression. | |
""" | |
expr_parts = peekable_iter(UNQUOTED_BACKTICK_MATCHER.split(expr)) | |
sanitized_expr = [] | |
for expr_part in expr_parts: | |
if expr_part == "`": | |
variable_name_parts = [] | |
while expr_parts.peek(None) not in ("`", None): | |
variable_name_parts.append(next(expr_parts)) | |
variable_name = "".join(variable_name_parts) | |
if expr_parts.peek(None) is None: | |
sanitized_expr.append(f"`{variable_name}") | |
else: | |
next(expr_parts) | |
new_name = sanitize_variable_name(variable_name, env) | |
sanitized_expr.append(f" {new_name} ") | |
else: | |
sanitized_expr.append(expr_part) | |
return "".join(sanitized_expr).strip() | |
def sanitize_variable_name(name: str, env: MutableMapping) -> str: | |
""" | |
Generate a valid Python variable name for variable identifier `name`. | |
Args: | |
name: The variable name to sanitize. | |
env: The mapping of variable name to values in the evaluation | |
environment. If `name` is present in this mapping, an alias is | |
created for the same value for the new variable name. | |
""" | |
if name.isidentifier() or keyword.iskeyword(name): | |
return name | |
# Compute recognisable basename | |
base_name = "".join([char if re.match(r"\w", char) else "_" for char in name]) | |
if base_name[0].isdigit(): | |
base_name = "_" + base_name | |
# Verify new name is not in env already, and if not add a random suffix. | |
new_name = base_name | |
while new_name in env: | |
new_name = ( | |
base_name | |
+ "_" | |
+ "".join(numpy.random.choice(list("abcefghiklmnopqrstuvwxyz"), 10)) | |
) | |
# Reuse the value for `name` for `new_name` also. | |
if name in env: | |
env[new_name] = env[name] | |
return new_name | |
from typing import Iterable, Set | |
from formulaic.parser.types import Factor, Term | |
def differentiate_term( | |
term: Term, | |
vars: Iterable[str], # pylint: disable=redefined-builtin | |
use_sympy: bool = False, | |
) -> Term: | |
""" | |
Symbolically differentiate a `Term` instance with respect to one or more `vars`. | |
Args: | |
term: The `Term` instance to differentiate. | |
vars: The variables by which to differentiate. | |
use_sympy: Whether to interpret factor token strings using sympy. If | |
`True`, symbolic factors like `log(x)` can be differentiated with | |
respect to `x`. If `False`, factor token strings must match the | |
variable exactly in order to be detected. | |
Returns: | |
A new `Term` instance representing the differentiated term. | |
Notes: | |
- This method takes into account the chain rule/etc. | |
- Care must be taken to make sure that the symbolic representation of | |
the factors can be properly interpreted by `sympy`. For example, `I(x)` | |
would not be understood. | |
""" | |
factors = set(term.factors) | |
for var in vars: | |
affected_factors = set( | |
factor | |
for factor in factors | |
if var in _factor_symbols(factor, use_sympy=use_sympy) | |
) | |
if not affected_factors: | |
return Term({Factor("0", eval_method="literal")}) | |
factors = factors.difference(affected_factors).union( | |
_differentiate_factors(affected_factors, var, use_sympy=use_sympy) | |
) | |
return Term(factors or {Factor("1", eval_method="literal")}) | |
def _factor_symbols(factor: Factor, use_sympy: bool = False) -> Set[str]: | |
""" | |
Extract the symbols represented in a factor. | |
Args: | |
factor: The `Factor` instance from which symbols should be extracted. | |
use_sympy: Whether to interpret the string representation of the | |
factor using `sympy`. | |
Returns: | |
The set of string symbols represented by the factor. | |
""" | |
if use_sympy: | |
try: | |
import sympy | |
return {str(s) for s in sympy.S(factor.expr).free_symbols} | |
except ImportError as e: # pragma: no cover | |
raise ImportError( | |
"`sympy` is not available. Install it using `pip install formulaic[calculus]` or `pip install sympy`." | |
) from e | |
return {factor.expr} | |
def _differentiate_factors( | |
factors: Set[Factor], var: str, use_sympy: bool = False | |
) -> Set[Factor]: | |
""" | |
Differentiate the nominated `factors` by `var`. | |
Args: | |
factors: The set of factors which should be differentiated (taking for | |
granted that they are multiplied together). | |
var: The variable by which to differentiate. | |
use_sympy: Whether to perform the differentiation using sympy, allowing | |
for symbolic differentiations like `log(x)` -> `1/x`. | |
Returns: | |
A set containing the new factors to replace the incoming factors in a | |
term. | |
""" | |
if use_sympy: | |
try: | |
import sympy | |
expr = sympy.S( | |
"(" + ") * (".join(factor.expr for factor in factors) + ")" | |
).diff(var) | |
eval_method = "python" | |
except ImportError as e: # pragma: no cover | |
raise ImportError( | |
"`sympy` is not available. Install it using `pip install formulaic[calculus]` or `pip install sympy`." | |
) from e | |
else: | |
assert len(factors) == 1 | |
expr = 1 | |
eval_method = next(iter(factors)).eval_method | |
if expr == 1: | |
return set() | |
return {Factor(f"({str(expr)})", eval_method=eval_method)} | |
import sys | |
from typing import Any, Optional, Mapping, Union | |
from .layered_mapping import LayeredMapping | |
def capture_context( | |
context: Optional[Union[int, Mapping[str, Any]]] = 0 | |
) -> Optional[Mapping[str, Any]]: | |
""" | |
Explicitly capture the context to be used by subsequent formula | |
materialisations. | |
Note: This function is primarily useful in libraries that wrap Formulaic, | |
allowing them to easily decouple the extraction of evaluation context from | |
the actual materializations calls, which may be several frames removed from | |
the users. Also note that implementers are free to do context collection | |
without this method, since passing of a dictionary context will always be | |
supported; however using this method allows users to treat formulaic as a | |
black box. | |
Args: | |
context: The context from which variables (and custom transforms/etc) | |
should be inherited. When specified as an integer, it is interpreted | |
as a frame offset from the caller's frame (i.e. 0, the default, | |
means that all variables in the caller's scope should be made | |
accessible when interpreting and evaluating formulae). Otherwise, a | |
mapping from variable name to value is expected. When nesting in a | |
library, and attempting to capture user-context, make sure you | |
account for the extra frames introduced by your wrappers. | |
Returns: | |
The context that should be later passed to the Formulaic materialization | |
procedure like: `.get_model_matrix(..., context=<this object>)`. | |
""" | |
if isinstance(context, int): | |
if hasattr(sys, "_getframe"): | |
frame = sys._getframe(context + 1) | |
context = LayeredMapping(frame.f_locals, frame.f_globals) | |
else: | |
context = None # pragma: no cover | |
return context | |
class _MissingType: | |
__instance__ = None | |
def __new__(cls): | |
if cls.__instance__ is None: | |
cls.__instance__ = super(_MissingType, cls).__new__(cls) | |
return cls.__instance__ | |
def __bool__(self): | |
return False | |
def __repr__(self): | |
return "MISSING" | |
def __copy__(self): | |
return self | |
def __deepcopy__(self, memo): | |
return self | |
MISSING = _MissingType() | |
from typing import Any, Iterable | |
from .sentinels import MISSING | |
class peekable_iter: | |
""" | |
An iterator that allows you to peek at the next element during iteration. | |
""" | |
def __init__(self, it: Iterable): | |
self._it = iter(it) | |
self._next = [] | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self._next: | |
return self._next.pop(0) | |
return next(self._it) | |
def peek(self, default: Any = MISSING) -> Any: | |
""" | |
Retrieve the object that will be next returned by the iterator. | |
Args: | |
default: The value to return if there are no more elements in the | |
iterator (otherwise the `StopIteration` exception will be | |
forwarded). | |
""" | |
try: | |
if not self._next: | |
self._next.append(next(self._it)) | |
return self._next[0] | |
except StopIteration: | |
if default is MISSING: | |
raise | |
return default | |
from typing import Iterable, Optional, Tuple, List | |
import numpy | |
import pandas | |
import scipy.sparse as spsparse | |
def categorical_encode_series_to_sparse_csc_matrix( | |
series: Iterable, levels: Optional[Iterable[str]] = None, drop_first: bool = False | |
) -> Tuple[List, spsparse.csc_matrix]: | |
""" | |
Categorically encode (via dummy encoding) a `series` as a sparse matrix. | |
Args: | |
series: The iterable which should be sparse encoded. | |
levels: The levels for which to generate dummies (if not specified, a | |
dummy variable is generated for every level in `series`). | |
drop_first: Whether to omit the first column in order to avoid | |
structural collinearity. | |
Returns: | |
A tuple of form `(levels, sparse_matrix)`, where `levels` contains the | |
levels that were used to generate dummies, and `sparse_matrix` is the | |
sparse (column-major) matrix representation of the series dummy | |
encoding. | |
""" | |
series = pandas.Categorical(series, levels) | |
levels = list(levels or series.categories) | |
if drop_first: | |
series = series.remove_categories(levels[0]) | |
levels = levels[1:] | |
codes = series.codes | |
non_null_code_indices = codes != -1 | |
indices = numpy.arange(series.shape[0])[non_null_code_indices] | |
codes = codes[non_null_code_indices] | |
sparse_matrix = spsparse.csc_matrix( | |
( | |
numpy.ones(codes.shape[0], dtype=float), # data | |
(indices, codes), # row # column | |
), | |
shape=(series.shape[0], len(levels)), | |
) | |
return levels, sparse_matrix | |
from __future__ import annotations | |
import ast | |
import functools | |
import itertools | |
from numbers import Number | |
from typing import Dict, Iterable, Optional, Sequence, Tuple, Union | |
import numpy | |
from formulaic.parser.algos.tokenize import tokenize | |
from formulaic.parser.algos.tokens_to_ast import tokens_to_ast | |
from formulaic.parser.types import ( | |
ASTNode, | |
Factor, | |
OperatorResolver, | |
Operator, | |
Term, | |
Token, | |
) | |
from formulaic.parser.utils import exc_for_token | |
LinearConstraintSpec = Union[ | |
str, | |
Dict[str, Number], | |
Tuple["numpy.typing.ArrayLike", "numpy.typing.ArrayLike"], | |
"numpy.typing.ArrayLike", | |
] | |
class LinearConstraints: | |
""" | |
Represents linear constraints of form $Ax = b$, where $A$ is a matrix of | |
coefficients for the features in $x$, and $b$ is a vector of constant | |
values. | |
Instances of this class are typically constructed via | |
`ModelSpec.get_linear_constraints(...)` but can also be constructed | |
directly for use in other contexts. | |
Attributes: | |
constraint_matrix: The matrix of coefficients on the features ($A$ from | |
above). Each row is one constraint. | |
constraint_values: The vector of constant values ($b$ from above). | |
variable_names: The ordered names of the variables represented by $x$; | |
typically the column names of a `ModelMatrix` instance. | |
""" | |
@classmethod | |
def from_spec( | |
cls, spec: LinearConstraintSpec, variable_names: Sequence[str] = None | |
) -> LinearConstraints: | |
""" | |
Construct a `LinearConstraints` instance from a specification. | |
Args: | |
spec: The specification from which to derive the constraints. Can be | |
a: | |
* str: In which case it is interpreted as a constraints | |
formula (e.g. "x + 2 * y = 3, z + y - x / 10"). All | |
variables used must be present in `variable_names`. | |
* Dict[str, Number]: In which case each key is treated as | |
formula, and each value as the constraint (e.g. {"x":19} | |
, {"a + b": 0}). | |
* Tuple: a two-tuple describing the constraint matrix and | |
values respectively. | |
* numpy.ndarray: a constraint matrix (with all values | |
assumed to be zero). | |
variable_names: The ordered names of the variables represented by | |
$x$; typically the column names of a `ModelMatrix` instance. | |
""" | |
if isinstance(spec, LinearConstraints): | |
return spec | |
if isinstance(spec, str): | |
matrix, values = LinearConstraintParser( | |
variable_names=variable_names | |
).get_matrix(spec) | |
return cls(matrix, values, variable_names) | |
if isinstance(spec, dict): | |
matrices, constants = [], [] | |
for key, constant in spec.items(): | |
matrix, values = LinearConstraintParser( | |
variable_names=variable_names | |
).get_matrix(key) | |
matrices.append(matrix) | |
constants.append(values + numpy.array(constant)) | |
return cls( | |
numpy.vstack(matrices), | |
numpy.hstack(constants), | |
variable_names=variable_names, | |
) | |
if isinstance(spec, tuple) and len(spec) == 2: | |
return cls(*spec, variable_names=variable_names) | |
return cls(spec, 0, variable_names=variable_names) | |
def __init__( | |
self, constraint_matrix, constraint_values, variable_names: Sequence[str] = None | |
): | |
""" | |
Attributes: | |
constraint_matrix: The matrix of coefficients on the features ($A$ from | |
above). Each row is one constraint. | |
constraint_values: The vector of constant values ($b$ from above). | |
variable_names: The ordered names of the variables represented by $x$; | |
typically the column names of a `ModelMatrix` instance. | |
""" | |
constraint_matrix = numpy.array(constraint_matrix) | |
constraint_values = numpy.array(constraint_values) | |
# Prepare incoming values | |
if len(constraint_matrix.shape) == 1: | |
constraint_matrix = constraint_matrix.reshape(1, *constraint_matrix.shape) | |
if len(constraint_values.shape) == 0: | |
constraint_values = constraint_values * numpy.ones( | |
constraint_matrix.shape[0] | |
) | |
variable_names = variable_names or [ | |
f"x{i}" for i in range(constraint_matrix.shape[1]) | |
] | |
# Validate incoming values | |
if len(constraint_matrix.shape) != 2: | |
raise ValueError("`constraint_matrix` must be a 2D array.") | |
if len(constraint_values.shape) != 1: | |
raise ValueError("`constraint_values` must be a 1D array.") | |
if constraint_values.shape[0] != constraint_matrix.shape[0]: | |
raise ValueError( | |
"Number of rows in constraint matrix does not equal the number of values in the values array." | |
) | |
if len(variable_names) != constraint_matrix.shape[1]: | |
raise ValueError( | |
"Number of column names does not match the number of columns in the linear constraint matrix." | |
) | |
self.constraint_matrix = constraint_matrix | |
self.constraint_values = constraint_values | |
self.variable_names = variable_names or [ | |
f"x{i}" for i in range(len(constraint_matrix)) | |
] | |
def __str__(self): | |
out = [] | |
for i in range(self.constraint_matrix.shape[0]): | |
out_one = [] | |
for nonzero_col in numpy.where(self.constraint_matrix[i, :])[0]: | |
out_one.append( | |
f"{self.constraint_matrix[i, nonzero_col]} * {self.variable_names[nonzero_col]}" | |
) | |
out.append(" + ".join(out_one) + f" = {self.constraint_values[i]}") | |
return "\n".join(out) | |
def show(self): | |
""" | |
Pretty-print the constraints. | |
""" | |
print(str(self)) | |
@property | |
def n_constraints(self): | |
""" | |
The number of constraints represented by this `LinearConstraints` | |
instance. | |
""" | |
return self.constraint_matrix.shape[0] | |
def __repr__(self): | |
return f"<LinearConstraints: {self.n_constraints} constraints>" | |
class LinearConstraintParser: | |
""" | |
A linear constraint parser. | |
While this parser re-uses parts of the parser stack under `FormulaParser`, | |
it interprets formulas using conventional algebra (rather than Wilkinson | |
formulas). | |
Attributes: | |
variable_names: The ordered names of the variables for which constraints | |
are being prepared. All variables used in the formula being parsed | |
must be present in this sequence. | |
operator_resolver: The operator resolver instance to use. If not | |
provided, `ConstraintOperatorResolver` is used. | |
""" | |
def __init__( | |
self, | |
variable_names: Sequence[str], | |
operator_resolver: Optional[OperatorResolver] = None, | |
): | |
self.variable_names = variable_names | |
self.operator_resolver = operator_resolver or ConstraintOperatorResolver() | |
def get_tokens(self, formula: str) -> Iterable[ConstraintToken]: | |
""" | |
Tokenize a constraint formula. | |
Args: | |
formula: The constraint formula to tokenize. | |
""" | |
return [ConstraintToken.for_token(token) for token in tokenize(formula)] | |
def get_ast(self, formula: str) -> ASTNode: | |
""" | |
Assemble an abstract syntax tree for the nominated `formula` string. | |
Args: | |
formula: The constraint formula for which an AST should be | |
generated. | |
""" | |
return tokens_to_ast( | |
self.get_tokens(formula), | |
operator_resolver=self.operator_resolver, | |
) | |
def get_terms(self, formula: str) -> Union[Sequence[Term], Tuple[Sequence[Term]]]: | |
""" | |
Build the `Term` instances for a constraint formula string. | |
Args: | |
formula: The constraint formula for which to build terms. | |
""" | |
ast = self.get_ast(formula) | |
if not ast: | |
return None | |
return ast.to_terms() | |
def get_matrix( | |
self, formula: str | |
) -> Tuple["numpy.typing.ArrayLike", "numpy.typing.ArrayLike"]: | |
""" | |
Build the constraint matrix and constraint values vector associated with | |
the parsed string. | |
Args: | |
formula: The constraint formula for which to build the constraint | |
matrix and values vector. | |
Returns: | |
A tuple of the contraint matrix and constraint values respectively. | |
""" | |
constraints = self.get_terms(formula) | |
if not constraints: | |
return numpy.empty((0, len(self.variable_names))), numpy.array([]) | |
if not isinstance(constraints, tuple): | |
constraints = (constraints,) | |
col_vectors = dict( | |
zip(self.variable_names, numpy.eye(len(self.variable_names))) | |
) | |
matrix = [] | |
constants = [] | |
for constraint in constraints: | |
vector = numpy.zeros(len(self.variable_names)) | |
constant = 0 | |
for term in constraint: | |
if term.factor == 1: | |
constant += term.scale | |
else: | |
vector += term.scale * col_vectors[term.factor.expr] | |
matrix.append(vector) | |
constants.append(-constant) | |
return numpy.array(matrix), numpy.array(constants) | |
class ConstraintToken(Token): | |
""" | |
An enriched `Token` subclass that overrides `.to_terms()` to return | |
a set of `ScaledFactor`s rather than `Terms`s. | |
""" | |
@classmethod | |
def for_token(cls, token: Token): | |
return cls( | |
**{ | |
attr: getattr(token, attr) | |
for attr in ("token", "kind", "source", "source_start", "source_end") | |
} | |
) | |
def to_terms(self): | |
if self.kind is Token.Kind.VALUE: | |
factor = ast.literal_eval(self.token) | |
if isinstance(factor, Number): | |
return {ScaledFactor(1, scale=factor)} | |
raise exc_for_token( | |
self, | |
message="Only numeric literal values are permitted in constraint formulae.", | |
) | |
return {ScaledFactor(self.to_factor())} | |
class ScaledFactor: | |
""" | |
A wrapper around a `Factor` instance that provides an additional "scale" | |
attribute to allow storing information about the scalar coefficient of each | |
`Factor`. | |
Attributes: | |
factor: The wrapped `Factor` instance. | |
scale: The scalar value to be used as the coefficient of this factor. | |
""" | |
def __init__(self, factor: Factor, *, scale: Number = 1): | |
self.factor = factor | |
self.scale = scale | |
def __add__(self, other): | |
if isinstance(other, ScaledFactor): | |
return ScaledFactor(self.factor, scale=self.scale + other.scale) | |
return NotImplemented # pragma: no cover | |
def __sub__(self, other): | |
if isinstance(other, ScaledFactor): | |
return ScaledFactor(self.factor, scale=self.scale - other.scale) | |
return NotImplemented # pragma: no cover | |
def __neg__(self): | |
return ScaledFactor(self.factor, scale=-self.scale) | |
def __hash__(self): | |
return hash(self.factor) | |
def __eq__(self, other): | |
if isinstance(other, ScaledFactor): | |
return self.factor == other.factor | |
return NotImplemented # pragma: no cover | |
def __repr__(self): | |
return f"{self.scale}*{self.factor}" # pragma: no cover | |
class ConstraintOperatorResolver( | |
OperatorResolver | |
): # pylint: disable=unnecessary-lambda | |
""" | |
The default constraint `OperatorResolver` implementation. | |
These operators describe a regular algebra rather than a Wikinson formula | |
one. | |
""" | |
@property | |
def operators(self): | |
def join_tuples(lhs, rhs): | |
if not isinstance(lhs, tuple): | |
lhs = (lhs,) | |
if not isinstance(rhs, tuple): | |
rhs = (rhs,) | |
return lhs + rhs | |
def add_terms(terms_left, terms_right): | |
terms_left = {term: term for term in terms_left} | |
terms_right = {term: term for term in terms_right} | |
added = set() | |
for term in terms_left: | |
if term in terms_right: | |
term = term + terms_right[term] | |
added.add(term) | |
added.update({term for term in terms_right if term not in added}) | |
return added | |
def sub_terms(terms_left, terms_right): | |
terms_left = {term: term for term in terms_left} | |
terms_right = {term: term for term in terms_right} | |
added = set() | |
for term in terms_left: | |
if term in terms_right: | |
term = term - terms_right[term] | |
added.add(term) | |
added.update( | |
negate_terms({term for term in terms_right if term not in added}) | |
) | |
return added | |
def negate_terms(terms): | |
return {-term for term in terms} | |
def mul_terms(terms_left, terms_right): | |
terms_left = {term: term for term in terms_left} | |
terms_right = {term: term for term in terms_right} | |
terms = set() | |
for term_left, term_right in itertools.product(terms_left, terms_right): | |
terms = add_terms(terms, {mul_term(term_left, term_right)}) | |
return terms | |
def mul_term(term_left, term_right): | |
if term_left.factor == 1: | |
return ScaledFactor( | |
term_right.factor, scale=term_left.scale * term_right.scale | |
) | |
if term_right.factor == 1: | |
return ScaledFactor( | |
term_left.factor, scale=term_left.scale * term_right.scale | |
) | |
raise RuntimeError( | |
"Only one non-scalar factor can be involved in a linear constraint multiplication." | |
) | |
def div_terms(terms_left, terms_right): | |
terms_left = {term: term for term in terms_left} | |
terms_right = {term: term for term in terms_right} | |
terms = set() | |
for term_left, term_right in itertools.product(terms_left, terms_right): | |
terms = add_terms(terms, {div_term(term_left, term_right)}) | |
return terms | |
def div_term(term_left, term_right): | |
if term_right.factor == 1: | |
return ScaledFactor( | |
term_left.factor, scale=term_left.scale / term_right.scale | |
) | |
raise RuntimeError( | |
"The right-hand operand must be a scalar in linear constraint division operations." | |
) | |
return [ | |
Operator( | |
",", | |
arity=2, | |
precedence=-200, | |
associativity=None, | |
to_terms=join_tuples, | |
accepts_context=lambda context: all(c.symbol == "," for c in context), | |
structural=True, | |
), | |
Operator( | |
"=", | |
arity=2, | |
precedence=-100, | |
associativity=None, | |
to_terms=lambda lhs, rhs: add_terms(lhs, negate_terms(rhs)), | |
), | |
Operator( | |
"+", | |
arity=2, | |
precedence=100, | |
associativity="left", | |
to_terms=lambda *args: functools.reduce(add_terms, args), | |
), | |
Operator( | |
"-", | |
arity=2, | |
precedence=100, | |
associativity="left", | |
to_terms=lambda left, right: sub_terms(left, right), | |
), | |
Operator( | |
"+", | |
arity=1, | |
precedence=100, | |
associativity="right", | |
fixity="prefix", | |
to_terms=lambda arg: arg, | |
), | |
Operator( | |
"-", | |
arity=1, | |
precedence=100, | |
associativity="right", | |
fixity="prefix", | |
to_terms=lambda arg: negate_terms(arg), | |
), | |
Operator( | |
"*", | |
arity=2, | |
precedence=200, | |
associativity="left", | |
to_terms=lambda lhs, rhs: mul_terms(lhs, rhs), | |
), | |
Operator( | |
"/", | |
arity=2, | |
precedence=200, | |
associativity="left", | |
to_terms=lambda lhs, rhs: div_terms(lhs, rhs), | |
), | |
] | |
from dataclasses import dataclass | |
from typing import Iterable, List | |
from .ast_node import ASTNode | |
from .operator_resolver import OperatorResolver | |
from .structured import Structured | |
from .term import Term | |
from .token import Token | |
@dataclass | |
class FormulaParser: | |
""" | |
The base formula parser API. | |
The role of subclasses of this class is to transform a string representation | |
of a formula into a (structured) sequence of `Term` instances that can be | |
evaluated by materializers and ultimately rendered into model matrices. | |
This class can be subclassed to customize this behavior. The three phases of | |
formula parsing are split out into separate methods to make this easier. | |
They are: | |
- get_tokens: Which returns an iterable of `Token` instances. By default | |
this uses `tokenize()` and handles the addition/removal of the | |
intercept. | |
- get_ast: Which converts the iterable of `Token`s into an abstract | |
syntax tree. By default this uses `tokens_to_ast()` and the nominated | |
`OperatorResolver` instance. | |
- get_terms: Which evaluates the abstract syntax tree and returns an | |
iterable of `Term`s. | |
Only the `get_terms()` method is essential from an API perspective. | |
""" | |
operator_resolver: OperatorResolver | |
def get_tokens(self, formula: str) -> Iterable[Token]: | |
""" | |
Return an iterable of `Token` instances for the nominated `formula` | |
string. | |
Args: | |
formula: The formula string to be tokenized. | |
""" | |
from ..algos.tokenize import tokenize | |
return tokenize(formula) | |
def get_ast(self, formula: str) -> ASTNode: | |
""" | |
Assemble an abstract syntax tree for the nominated `formula` string. | |
Args: | |
formula: The formula for which an AST should be generated. | |
""" | |
from ..algos.tokens_to_ast import tokens_to_ast | |
return tokens_to_ast( | |
self.get_tokens(formula), | |
operator_resolver=self.operator_resolver, | |
) | |
def get_terms(self, formula: str, *, sort: bool = True) -> Structured[List[Term]]: | |
""" | |
Assemble the `Term` instances for a formula string. Depending on the | |
operators involved, this may be an iterable of `Term` instances, or | |
an iterable of iterables of `Term`s, etc. | |
Args: | |
formula: The formula for which an AST should be generated. | |
sort: Whether to sort the terms before returning them. | |
""" | |
ast = self.get_ast(formula) | |
if ast is None: | |
return Structured([]) | |
terms = ast.to_terms() | |
if not isinstance(terms, Structured): | |
terms = Structured(terms) | |
if sort: | |
terms = terms._map(sorted) | |
return terms | |
from __future__ import annotations | |
import copy | |
import re | |
from enum import Enum | |
from typing import Any, Iterable, Optional, Tuple, Union | |
from .factor import Factor | |
from .term import Term | |
class Token: | |
""" | |
The atomic unit into which formula strings are parsed. | |
These tokens are intentionally very low-level, leaving interpretation and | |
validation to higher-levels. As such, adding new operators/etc does not | |
require any modification of this low-level code. | |
The four kinds of token are: | |
- context: a token used to scope terms into a given context | |
- operator: an operator to be applied to other surrounding tokens (will | |
always consist of non-word characters). | |
- name: a name of a feature/variable to be lifted from the model matrix | |
context. | |
- value: a literal value (string/number). | |
- python: a code string to be evaluated. | |
Attributes: | |
token: The portion of the formula string represented by this token. | |
kind: The kind of this token (see above). | |
source: The entire original source string. | |
source_start: The index of the character within the string that starts | |
this token. | |
source_end: The index of the character within the string that ends | |
this token. | |
Note: These attributes *should* all be present, but may not be fully | |
populated if generated outside of the default `tokenize()` implementation. | |
""" | |
class Kind(Enum): | |
CONTEXT = "context" | |
OPERATOR = "operator" | |
VALUE = "value" | |
NAME = "name" | |
PYTHON = "python" | |
__slots__ = ("token", "_kind", "source", "source_start", "source_end") | |
def __init__( | |
self, | |
token: str = "", | |
*, | |
kind: Optional[Union[str, Kind]] = None, | |
source: Optional[str] = None, | |
source_start: Optional[int] = None, | |
source_end: Optional[int] = None, | |
): | |
self.token = token | |
self.kind = kind | |
self.source = source | |
self.source_start = source_start | |
self.source_end = source_end or source_start | |
@property | |
def kind(self) -> Optional[Kind]: | |
return self._kind | |
@kind.setter | |
def kind(self, kind: Optional[Union[str, Kind]]): | |
self._kind = self.Kind(kind) if kind else kind | |
def update( | |
self, char: str, source_index: int, kind: Optional[Kind] = None | |
) -> "Token": | |
""" | |
Add a character to the token string, keeping track of the source | |
indices. | |
Args: | |
char: The character to add. | |
source_index: The index of the character within the source string. | |
kind: If present, the kind of the token is updated to reflect the | |
nominated kind. | |
Returns: | |
A reference to this token instance. | |
""" | |
self.token += char | |
if self.source_start is None: | |
self.source_start = source_index | |
self.source_end = source_index | |
if kind is not None: | |
self.kind = kind | |
return self | |
def __bool__(self): | |
return bool(self.token) | |
def __eq__(self, other): | |
if isinstance(other, str): | |
return self.token == other | |
if isinstance(other, Token): | |
return self.token == other.token and self.kind == other.kind | |
return NotImplemented | |
def __hash__(self): | |
return self.token.__hash__() | |
def __lt__(self, other): | |
if isinstance(other, Token): | |
return self.token < other.token | |
return NotImplemented | |
@property | |
def source_loc(self) -> Tuple[int, int]: | |
""" | |
The indices of the first and last character represented by this token in | |
the source string. | |
""" | |
return (self.source_start, self.source_end) | |
def to_factor(self) -> Factor: | |
""" | |
A `Factor` instance corresponding to this token. Note that operator | |
tokens cannot be converted to tokens. | |
""" | |
kind_to_eval_method = { | |
Token.Kind.NAME: "lookup", | |
Token.Kind.PYTHON: "python", | |
Token.Kind.VALUE: "literal", | |
} | |
return Factor( | |
expr=self.token, | |
eval_method=kind_to_eval_method[self.kind], | |
token=self, | |
) | |
def to_terms(self) -> Iterable[Term]: | |
""" | |
An iterable (set) of `Term` instances for this token. This will just be | |
an iterable with one `Term` having one `Factor` (that generated by | |
`.to_factor()`). Operator tokens cannot be converted to an iterable of | |
`Term`s. | |
""" | |
return {Term([self.to_factor()])} | |
def flatten(self, str_args=False) -> Any: | |
""" | |
Return this token (or if `str_args` is `True`, a string representation | |
of this token). | |
Args: | |
str_args: Whether to convert this token to a string during | |
flattening. | |
""" | |
return str(self) if str_args else self | |
def get_source_context(self, colorize: bool = False) -> str: | |
""" | |
Render a string that highlights the location of this token in the source | |
string. | |
Args: | |
colorize: Whether to highlight the location of this token in bold | |
red font. | |
""" | |
if not self.source or self.source_start is None or self.source_end is None: | |
return None | |
if colorize: | |
RED_BOLD = "\x1b[1;31m" | |
RESET = "\x1b[0m" | |
return f"{self.source[:self.source_start]}⧛{RED_BOLD}{self.source[self.source_start:self.source_end+1]}{RESET}⧚{self.source[self.source_end+1:]}" | |
return f"{self.source[:self.source_start]}⧛{self.source[self.source_start:self.source_end+1]}⧚{self.source[self.source_end+1:]}" | |
def __repr__(self): | |
return self.token | |
# Additional methods for later mutation | |
def copy_with_attrs(self, **attrs) -> Token: | |
""" | |
Return a copy of this `Token` instance with attributes set from attrs. | |
Args: | |
attrs: Attribute keys and values to set on the copy of this | |
instance. | |
""" | |
new_token = copy.copy(self) | |
for attr, value in attrs.items(): | |
setattr(new_token, attr, value) | |
return new_token | |
def split( | |
self, pattern: Union[str, re.Pattern], after=False, before=False | |
) -> Iterable[Token]: | |
""" | |
Split this instance into multple tokens around all non-overlapping | |
matches of `pattern`. | |
Args: | |
pattern: The pattern by which to split this `Token` instance. | |
after: Whether to split after the pattern. | |
before: Whether to split before the pattern. | |
""" | |
if not after and not before: | |
yield self | |
return | |
if not isinstance(pattern, re.Pattern): | |
pattern = re.compile(pattern) | |
last_index = 0 | |
separators = pattern.finditer(self.token) | |
def get_next_token(next_index): | |
return next_index, self.copy_with_attrs( | |
token=self.token[last_index:next_index] | |
) | |
for separator in separators: | |
if before: | |
last_index, new_token = get_next_token(separator.span()[0]) | |
yield new_token | |
if after: | |
last_index, new_token = get_next_token(separator.span()[1]) | |
yield new_token | |
if last_index < len(self.token): | |
yield get_next_token(len(self.token))[1] | |
from typing import Iterable, TYPE_CHECKING | |
if TYPE_CHECKING: | |
from .factor import Factor # pragma: no cover | |
class Term: | |
""" | |
Represents a "term" of a formula. | |
A "term" is a product of "factors" (represented by `Factor`) instances, and | |
a formula is made up of a sum of terms. | |
Attributes: | |
factors: The set of factors to be multipled to form the term. | |
""" | |
def __init__(self, factors: Iterable["Factor"]): | |
self.factors = tuple(sorted(set(factors))) | |
self._factor_exprs = tuple(factor.expr for factor in self.factors) | |
self._hash = hash(repr(self)) | |
# Transforms and comparisons | |
def __mul__(self, other): | |
if isinstance(other, Term): | |
return Term([*self.factors, *other.factors]) | |
return NotImplemented | |
def __hash__(self): | |
return self._hash | |
def __eq__(self, other): | |
if isinstance(other, Term): | |
return self._factor_exprs == other._factor_exprs | |
if isinstance(other, str): | |
return repr(self) == other | |
return NotImplemented | |
def __lt__(self, other): | |
if isinstance(other, Term): | |
if len(self.factors) == len(other.factors): | |
return sorted(self.factors) < sorted(other.factors) | |
if len(self.factors) < len(other.factors): | |
return True | |
return False | |
return NotImplemented | |
def __repr__(self): | |
return ":".join(self._factor_exprs) | |
import abc | |
from collections import defaultdict | |
from typing import List, Union | |
from ..utils import exc_for_token | |
from .operator import Operator | |
from .token import Token | |
# Cached property was introduced in Python 3.8 (we currently support 3.7) | |
try: | |
from functools import cached_property | |
except ImportError: # pragma: no cover | |
from cached_property import cached_property | |
class OperatorResolver(metaclass=abc.ABCMeta): | |
""" | |
Resolves which `Operator` instance should be used for a given operator | |
`Token`. | |
This class should be subclassed and have `.operators` and/or `.resolve()` | |
overridden in order to achieve the desired formula algebra. | |
Note: most users will probably be interested in extending/subclassing | |
`DefaultOperatorResolver`, which implements the default formula operator | |
logic. You should subclass this class directly only if you want to start | |
from scratch. | |
Attributes: | |
operator_table: A cache of the mapping from operator symbol to | |
`Operator` instances implementing it. | |
""" | |
@property | |
@abc.abstractmethod | |
def operators(self) -> List[Operator]: | |
""" | |
The `Operator` instance pool which can be matched to tokens by | |
`.resolve()`. | |
""" | |
@cached_property | |
def operator_table(self): | |
operator_table = defaultdict(list) | |
for operator in self.operators: | |
operator_table[operator.symbol].append(operator) | |
for symbol in operator_table: | |
operator_table[symbol] = sorted( | |
operator_table[symbol], key=lambda op: op.precedence, reverse=True | |
) | |
return operator_table | |
def resolve( | |
self, token: Token, max_prefix_arity: int, context: List[Union[Token, Operator]] | |
) -> List[Operator]: | |
""" | |
Return a list of operators to apply for a given token in the AST | |
generation. | |
Args: | |
token: The operator `Token` instance for which `Operator`(s) should | |
be resolved. | |
max_prefix_arity: The number operator unclaimed tokens preceding the | |
operator in the formula string. | |
context: The current list of operators into which the operator to be | |
resolved will be placed. This will be a list of `Operator` | |
instances or tokens (tokens are return for grouping operators). | |
""" | |
return [self._resolve(token, token.token, max_prefix_arity, context)] | |
def _resolve( | |
self, | |
token: Token, | |
symbol: str, | |
max_prefix_arity: int, | |
context: List[Union[Token, Operator]], | |
) -> Operator: | |
""" | |
The default operator resolving logic. | |
""" | |
if symbol not in self.operator_table: | |
raise exc_for_token(token, f"Unknown operator '{symbol}'.") | |
candidates = [ | |
candidate | |
for candidate in self.operator_table[symbol] | |
if ( | |
max_prefix_arity == 0 | |
and candidate.fixity is Operator.Fixity.PREFIX | |
or max_prefix_arity > 0 | |
and candidate.fixity is not Operator.Fixity.PREFIX | |
) | |
and candidate.accepts_context(context) | |
] | |
if not candidates: | |
raise exc_for_token(token, f"Operator `{symbol}` is incorrectly used.") | |
if len(candidates) > 1: | |
raise exc_for_token( | |
token, | |
f"Ambiguous operator `{symbol}`. This is not usually a user error. Please report this!", | |
) | |
return candidates[0] | |
# The operator table cache may not be pickleable, so let's drop it. | |
def __getstate__(self): | |
return {} | |
from __future__ import annotations | |
from enum import Enum | |
from numbers import Number | |
from typing import Callable, List, Iterable, Union | |
from .term import Term | |
from .token import Token | |
class Operator: | |
""" | |
Specification for how an operator in a formula string should behave. | |
Attributes: | |
symbol: The operator for which the configuration applies. | |
arity: The number of arguments that this operator consumes. | |
precedence: How tightly this operator binds its arguments (the higher | |
the number, the more tightly it binds). Operators with higher | |
precedence will be evaluated first. | |
associativity: One of 'left', 'right', or 'none'; indicating how | |
operators of the same precedence should be evaluated in the absence | |
of explicit grouping parentheses. If left associative, groups are | |
formed from the left [e.g. a % b % c -> ((a % b) % c)]; and | |
similarly for right. | |
fixity: One of 'prefix', 'infix', or 'postfix'; indicating how the | |
operator is positioned relative to its arguments. If 'prefix', the | |
operator comes before its arguments; if 'infix', the operator comes | |
between its arguments (and there must be exactly two of them); and | |
if 'postfix', the operator comes after its arguments. | |
to_terms: A callable that maps the arguments pass to the operator to | |
an iterable of `Term` instances. | |
accepts_context: A callable that will receive a list of Operator and | |
Token instances that describe the context in which the operator | |
would be applied if this callable returns `True`. | |
structural: Whether this operator adds structure to the terms sets, in | |
which case `Structured._merge` will not be used in the | |
`ASTNode.to_terms()`, and the termsets will be directly passed to | |
`Operator.to_terms()`. | |
""" | |
class Associativity(Enum): | |
LEFT = "left" | |
RIGHT = "right" | |
NONE = "none" | |
class Fixity(Enum): | |
PREFIX = "prefix" | |
INFIX = "infix" | |
POSTFIX = "postfix" | |
def __init__( | |
self, | |
symbol: str, | |
*, | |
arity: int, | |
precedence: Number, | |
associativity: Union[str, Associativity] = "none", | |
fixity: Union[str, Fixity] = "infix", | |
to_terms: Callable[..., Iterable[Term]] = None, | |
accepts_context: Callable[[List[Union[Token, Operator]]], bool] = None, | |
structural: bool = False, | |
): | |
self.symbol = symbol | |
self.arity = arity | |
self.precedence = precedence | |
self.associativity = associativity | |
self.fixity = fixity | |
self._to_terms = to_terms | |
self._accepts_context = accepts_context | |
self.structural = structural | |
@property | |
def associativity(self): | |
return self._associativity | |
@associativity.setter | |
def associativity(self, associativity): | |
self._associativity = Operator.Associativity(associativity or "none") | |
@property | |
def fixity(self): | |
return self._fixity | |
@fixity.setter | |
def fixity(self, fixity): | |
self._fixity = Operator.Fixity(fixity) | |
def to_terms(self, *args): | |
if self._to_terms is None: | |
raise RuntimeError(f"`to_terms` is not implemented for '{self.symbol}'.") | |
return self._to_terms(*args) | |
def accepts_context(self, context: List[Union[Token, Operator]]): | |
if self._accepts_context: | |
# We only need to pass on tokens and operators with precedence less | |
# than or equal to ourselves, since all other operators will be | |
# evaluated before us. | |
return self._accepts_context( | |
[ | |
c | |
for c in context | |
if isinstance(c, Token) or c.precedence <= self.precedence | |
] | |
) | |
return True | |
def __repr__(self): | |
return self.symbol | |
from .ast_node import ASTNode | |
from .factor import Factor | |
from .formula_parser import FormulaParser | |
from .operator import Operator | |
from .operator_resolver import OperatorResolver | |
from .structured import Structured | |
from .term import Term | |
from .token import Token | |
__all__ = [ | |
"ASTNode", | |
"Factor", | |
"FormulaParser", | |
"Operator", | |
"OperatorResolver", | |
"Structured", | |
"Term", | |
"Token", | |
] | |
from __future__ import annotations | |
import itertools | |
from collections import defaultdict | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
Generator, | |
Generic, | |
Iterable, | |
Optional, | |
Tuple, | |
Type, | |
TypeVar, | |
Union, | |
) | |
ItemType = TypeVar("ItemType") | |
_MISSING = object() | |
class Structured(Generic[ItemType]): | |
""" | |
Layers structure onto an arbitrary type. | |
Structure can be added in two ways: by keys and by tuples, and can be | |
arbitrarily nested. If present, the object assigned to the "root" key is | |
treated specially, in that enumeration over the structured instance is | |
equivalent to enumeration over the root node if there is no other structure. | |
Otherwise, enumeration and key look up is done over the top-level values in | |
the container in the order in which they were assigned (except that the root | |
node is always first). | |
The structure is mutable (new keys can be added, or existing attributes | |
overridden) by direct assignment in the usual way; or via the `_update` | |
method. To avoid collision with potential keys, all methods and attributes | |
are preceded with an underscore. Contrary to Python convention, these are | |
still considered public methods. | |
Attributes: | |
_structure: A dictionary of the keys stored in the `Structured` | |
instance. | |
_metadata: A dictionary of metadata which can be used to store arbitrary | |
information about the `Structured` instance. | |
Examples: | |
``` | |
>>> s = Structured((1, 2), b=3, c=(4,5)); s | |
root: | |
[0]: | |
1 | |
[1]: | |
2 | |
.b: | |
3 | |
.c: | |
[0]: | |
4 | |
[1]: | |
5 | |
>>> list(s) | |
[(1, 2), 3, (4, 5)] | |
>>> s.root | |
(1, 2) | |
>>> s.b | |
3 | |
>>> s._map(lambda x: x+1) | |
root: | |
[0]: | |
2 | |
[1]: | |
3 | |
.b: | |
4 | |
.c: | |
[0]: | |
5 | |
[1]: | |
6 | |
``` | |
""" | |
__slots__ = ("_structure", "_metadata") | |
def __init__( | |
self, | |
root: Any = _MISSING, | |
*, | |
_metadata: Dict[str, Any] = None, | |
**structure, | |
): | |
if any(key.startswith("_") for key in structure): | |
raise ValueError( | |
"Substructure keys cannot start with an underscore. " | |
f"The invalid keys are: {set(key for key in structure if key.startswith('_'))}." | |
) | |
if root is not _MISSING: | |
structure["root"] = self.__prepare_item("root", root) | |
self._metadata = _metadata | |
self._structure = { | |
key: self.__prepare_item(key, item) for key, item in structure.items() | |
} | |
def __prepare_item(self, key: str, item: Any) -> ItemType: | |
if isinstance(item, Structured): | |
return item._map( | |
lambda x: self._prepare_item(key, x), as_type=self.__class__ | |
) | |
if isinstance(item, tuple): | |
return tuple(self.__prepare_item(key, v) for v in item) | |
return self._prepare_item(key, item) | |
def _prepare_item(self, key: str, item: Any) -> ItemType: | |
return item | |
@property | |
def _has_root(self) -> bool: | |
""" | |
Whether this instance of `Structured` has a root node. | |
""" | |
return "root" in self._structure | |
@property | |
def _has_keys(self) -> bool: | |
""" | |
Whether this instance of `Structured` has any non-root named | |
substructures. | |
""" | |
return set(self._structure) != {"root"} | |
@property | |
def _has_structure(self) -> bool: | |
""" | |
Whether this instance of `Structured` has any non-trivial structure, | |
including named or unnamed substructures. | |
""" | |
return self._has_keys or self._has_root and isinstance(self.root, tuple) | |
def _map( | |
self, | |
func: Callable[[ItemType], Any], | |
recurse: bool = True, | |
as_type: Optional[Type[Structured]] = None, | |
) -> Structured[Any]: | |
""" | |
Map a callable object onto all the structured objects, returning a | |
`Structured` instance with identical structure, where the original | |
objects are replaced with the output of `func`. | |
Args: | |
func: The callable to apply to all objects contained in the | |
`Structured` instance. | |
recurse: Whether to recursively map, or only map one level deep (the | |
objects directly referenced by this `StructuredInstance`). | |
When `True`, if objects within this structure are `Structured` | |
instances also, then the map will be applied only on the leaf | |
nodes (otherwise `func` will received `Structured` instances). | |
(default: True). | |
as_type: An optional subclass of `Structured` to use for the mapped | |
values. If not provided, the base `Structured` type is used. | |
Returns: | |
A `Structured` instance with the same structure as this instance, | |
but with all objects transformed under `func`. | |
""" | |
def apply_func(obj): | |
if recurse and isinstance(obj, Structured): | |
return obj._map(func, recurse=True, as_type=as_type) | |
if isinstance(obj, tuple): | |
return tuple(apply_func(o) for o in obj) | |
return func(obj) | |
return (as_type or Structured)( | |
**{key: apply_func(obj) for key, obj in self._structure.items()} | |
) | |
def _flatten(self) -> Generator[ItemType]: | |
""" | |
Flatten any nested structure into a sequence of all values stored in | |
this `Structured` instance. The order is currently that yielded by a | |
depth-first iteration, however this is not guaranteed and should not | |
be relied upon. | |
""" | |
for value in self._structure.values(): | |
if isinstance(value, Structured): | |
yield from value._flatten() | |
elif isinstance(value, tuple): | |
for v in value: | |
if isinstance(v, Structured): | |
yield from v._flatten() | |
else: | |
yield v | |
else: | |
yield value | |
def _to_dict(self, recurse: bool = True) -> Dict[Optional[str], Any]: | |
""" | |
Generate a dictionary representation of this structure. | |
Args: | |
recurse: Whether to recursively convert any nested `Structured` | |
instances into dictionaries also. If `False`, any nested | |
`Structured` instances will be surfaced in the generated | |
dictionary. | |
Returns: | |
The dictionary representation of this `Structured` instance. | |
""" | |
def do_recursion(obj): | |
if recurse and isinstance(obj, Structured): | |
return obj._to_dict() | |
if isinstance(obj, tuple): | |
return tuple(do_recursion(o) for o in obj) | |
return obj | |
return {key: do_recursion(value) for key, value in self._structure.items()} | |
def _simplify( | |
self, *, recurse: bool = True, unwrap: bool = True, inplace: bool = False | |
) -> Union[Any, Structured[ItemType]]: | |
""" | |
Simplify this `Structured` instance by: | |
- returning the object stored at the root node if there is no other | |
structure (removing as many `Structured` wrappers as satisfy | |
this requirement). | |
- if `recurse` is `True`, recursively applying the logic above to | |
any nested `Structured` instances. | |
Args: | |
unwrap: Whether to unwrap the root node (returning the raw | |
unstructured root value) if there is no other structure. | |
recurse: Whether to recurse the simplification into the objects | |
associated with the keys of this (and nested) `Structured` | |
instances. | |
inplace: Whether to simplify the current structure (`True`), or | |
return a new object with the simplifications (`False`). Note | |
that if `True`, `unwrap` *must* be `False`. | |
""" | |
if inplace and unwrap: | |
raise RuntimeError( | |
f"Cannot simplify `{self.__class__.__name__}` instances " | |
"in-place if `unwrap` is `True`." | |
) | |
structured = self | |
while ( | |
isinstance(structured, Structured) | |
and structured._has_root | |
and not structured._has_structure | |
and (unwrap or isinstance(structured.root, Structured)) | |
): | |
structured = structured.root | |
if not isinstance(structured, Structured): | |
return structured | |
structure = structured._structure | |
if recurse: | |
def simplify_obj(obj): | |
if isinstance(obj, Structured): | |
return obj._simplify(recurse=True) | |
if isinstance(obj, tuple): | |
return tuple(simplify_obj(o) for o in obj) | |
return obj | |
structure = { | |
key: simplify_obj(value) for key, value in structured._structure.items() | |
} | |
if inplace: | |
self._structure = structure | |
return self | |
return self.__class__( | |
_metadata=self._metadata, | |
**structure, | |
) | |
def _update(self, root=_MISSING, **structure) -> Structured[ItemType]: | |
""" | |
Return a new `Structured` instance that is identical to this one but | |
the root and/or keys replaced with the nominated values. | |
Args: | |
root: The (optional) replacement of the root node. | |
structure: Any additional key/values to update in the structure. | |
""" | |
if root is not _MISSING: | |
structure["root"] = root | |
return self.__class__( | |
**{ | |
"_metadata": self._metadata, | |
**self._structure, | |
**{ | |
key: self.__prepare_item(key, item) | |
for key, item in structure.items() | |
}, | |
} | |
) | |
@classmethod | |
def _merge( | |
cls, | |
*objects: Any, | |
merger: Callable[..., ItemType] = None, | |
_context: Tuple[str, ...] = (), | |
) -> Union[ItemType, Structured[ItemType]]: | |
""" | |
Merge arbitrarily many objects into a single `Structured` instance. | |
If any of `objects` are `Structured` or `tuple` instances, then all | |
`objects` will be treated as `Structured` instances (being upcast as | |
necessary) and then merged recursively; otherwise the objects will be | |
merged directly by `merger`. | |
Note: An empty set of objects will result in an empty `Structured` | |
instance being returned. | |
Args: | |
objects: A tuple of Structured instances (will be upcast to a | |
trivial `Structured` instance as necessary). | |
merger: A callable which takes as arguments two or more items which | |
are to be merged. If not provided, a basic fallback is provided | |
that knows how to merge lists, dictionaries and sets. | |
_context: A string representing the context of the merge. Intended | |
for internal use. | |
""" | |
if merger is None: | |
merger = cls.__merger_default | |
# If objects are not specified, return an empty `Structured` instance. | |
if not objects: | |
return cls() | |
# Check for sequential (tuple) structures, and if so merge them and | |
# return them wrapped in a `Structured` instance. | |
all_tuples = all(isinstance(obj, tuple) for obj in objects) | |
any_tuples = any(isinstance(obj, tuple) for obj in objects) | |
if any_tuples and not all_tuples: | |
raise ValueError( | |
f"Substructures for `.{'.'.join(_context)}` are not aligned and cannot be merged." | |
) | |
if all_tuples: | |
merged = tuple(itertools.chain(*objects)) | |
if _context: | |
# We are merging substructure of `Structured` instances (and don't need the class wrapper) | |
return merged | |
return cls(merged) | |
# Check whether all objects are not Structured instances (or tuples, | |
# already excluded by above). If so, just call `merger` on them | |
# directly. | |
if all(not isinstance(obj, Structured) for obj in objects): | |
return merger(*objects) | |
# Otherwise,iterate over objects, upcasting to `Structured` as necessary | |
# and recursively merge them by merging their structure dictionaries. | |
values_to_merge = defaultdict(list) | |
for obj in objects: | |
if isinstance(obj, Structured): | |
for key, value in obj._structure.items(): | |
values_to_merge[key].append(value) | |
else: | |
values_to_merge["root"].append(obj) | |
return cls( | |
**{ | |
key: ( | |
cls._merge(*values, merger=merger, _context=_context + (key,)) | |
if len(values) > 1 | |
else values[0] | |
) | |
for key, values in values_to_merge.items() | |
} | |
) | |
@staticmethod | |
def __merger_default(*items): | |
if all(isinstance(item, list) for item in items): | |
return list(itertools.chain(*items)) | |
if all(isinstance(item, set) for item in items): | |
return set.union(*items) | |
if all(isinstance(item, dict) for item in items): | |
return dict(itertools.chain(*(d.items() for d in items))) | |
raise NotImplementedError( | |
"The fallback `merger` for `Structured._merge` does not know how to " | |
f"merge objects of types {repr(tuple(type(item) for item in items))}. " | |
"Please specify `merger` explicitly." | |
) | |
def __dir__(self): | |
return super().__dir__() + list(self._structure) | |
def __getattr__(self, attr): | |
if attr.startswith("_"): | |
raise AttributeError(attr) | |
if attr in self._structure: | |
return self._structure[attr] | |
raise AttributeError( | |
f"This `{self.__class__.__name__}` instance does not have structure @ `{repr(attr)}`." | |
) | |
def __setattr__(self, attr, value): | |
if attr.startswith("_"): | |
super().__setattr__(attr, value) | |
return | |
self._structure[attr] = self.__prepare_item(attr, value) | |
def __getitem__(self, key): | |
if self._has_root and not self._has_keys: | |
return self.root[key] | |
if key in (None, "root") and self._has_root: | |
return self.root | |
if isinstance(key, str) and not key.startswith("_") and key in self._structure: | |
return self._structure[key] | |
raise KeyError( | |
f"This `{self.__class__.__name__}` instance does not have structure @ `{repr(key)}`." | |
) | |
def __setitem__(self, key, value): | |
if not isinstance(key, str) or not key.isidentifier(): | |
raise KeyError(key) | |
if key.startswith("_"): | |
raise KeyError( | |
"Substructure keys cannot start with an underscore. " | |
f"The invalid keys are: {set(key for key in self._structure if key.startswith('_'))}." | |
) | |
self._structure[key] = self.__prepare_item(key, value) | |
def __iter__(self) -> Generator[Union[ItemType, Structured[ItemType]]]: | |
if self._has_root and not self._has_keys and isinstance(self.root, Iterable): | |
yield from self.root | |
else: | |
if self._has_root: # Always yield root first. | |
yield self.root | |
for key, value in self._structure.items(): | |
if key != "root": | |
yield value | |
def __eq__(self, other): | |
if isinstance(other, Structured): | |
return self._structure == other._structure | |
return False | |
def __contains__(self, key): | |
return key in self._structure | |
def __len__(self) -> int: | |
return sum(1 for _ in self) | |
def __str__(self): | |
return self.__repr__(to_str=str) | |
def __repr__(self, to_str=repr): | |
import textwrap | |
d = self._to_dict(recurse=False) | |
keys = [key for key in d if key != "root"] | |
if self._has_root: | |
keys.insert(0, "root") | |
out = [] | |
for key in keys: | |
if key == "root": | |
out.append("root:") | |
else: | |
out.append(f".{key}:") | |
value = d[key] | |
if isinstance(value, tuple): | |
for i, obj in enumerate(value): | |
out.append(f" [{i}]:") | |
out.append(textwrap.indent(to_str(obj), " ")) | |
else: | |
out.append(textwrap.indent(to_str(value), " ")) | |
return "\n".join(out) | |
from __future__ import annotations | |
from enum import Enum | |
from typing import Dict, Iterable, Optional, Union, TYPE_CHECKING | |
from .term import Term | |
if TYPE_CHECKING: | |
from .token import Token # pragma: no cover | |
class Factor: | |
""" | |
Factors are the indivisable atomic unit that make up formulas. | |
Each instance of `Factor` is a specification that is evaluable by a | |
materializer to generate concrete vector(s). `Factors` are multiplied | |
together into `Term`s, which in turn represent the output columns of model | |
matrices. Note that `Factor` instances are entirely abstract of data. | |
Attributes: | |
expr: The (string) expression to be evaluated by the materializer. | |
eval_method: An `EvalMethod` enum instance indicating the mechanism to | |
be used to evaluate the expression (one of: unknown, literal, lookup | |
or python). | |
kind: The kind of data represented (one of: unknown, constant, | |
numerical, categorical). | |
metadata: An additional (optional) dictionary of metadata (currently | |
unused). | |
token: The `Token` instance from which the the `Formula` object was | |
created. | |
""" | |
class EvalMethod(Enum): | |
LITERAL = "literal" | |
LOOKUP = "lookup" | |
PYTHON = "python" | |
class Kind(Enum): | |
UNKNOWN = "unknown" | |
CONSTANT = "constant" | |
NUMERICAL = "numerical" | |
CATEGORICAL = "categorical" | |
__slots__ = ("expr", "_eval_method", "_kind", "metadata", "token") | |
def __init__( | |
self, | |
expr: str = "", | |
*, | |
eval_method: Optional[Union[str, EvalMethod]] = None, | |
kind: Optional[Union[str, Kind]] = None, | |
metadata: Optional[Dict] = None, | |
token: Optional[Token] = None, | |
): | |
self.expr = expr | |
self.eval_method = eval_method | |
self.kind = kind | |
self.metadata = metadata or {} | |
self.token = token | |
@property | |
def eval_method(self) -> EvalMethod: | |
return self._eval_method | |
@eval_method.setter | |
def eval_method(self, eval_method): | |
self._eval_method = Factor.EvalMethod(eval_method or "lookup") | |
@property | |
def kind(self) -> Kind: | |
return self._kind | |
@kind.setter | |
def kind(self, kind): | |
self._kind = Factor.Kind(kind or "unknown") | |
def __eq__(self, other): | |
if isinstance(other, str): | |
return self.expr == other | |
if isinstance(other, Factor): | |
return self.expr == other.expr | |
return NotImplemented | |
def __hash__(self): | |
return self.expr.__hash__() | |
def __lt__(self, other): | |
if isinstance(other, Factor): | |
return self.expr < other.expr | |
return NotImplemented | |
def to_terms(self) -> Iterable[Term]: | |
""" | |
Convert this `Factor` instance into a `Term` instance, and expose it as | |
a single-element iterable. | |
""" | |
return {Term([self])} | |
def __repr__(self): | |
return self.expr | |
from __future__ import annotations | |
import graphlib | |
from typing import Any, Dict, Iterable, List | |
from .operator import Operator | |
from .structured import Structured | |
from .term import Term | |
class ASTNode: | |
""" | |
Represents a node in an Abstract Syntax Tree (AST). | |
An `ASTNode` instance is composed of an `Operator` instance and a set of | |
arguments to be passed into that operator. The arguments may include nested | |
`ASTNode`s or other arguments. Once evaluated, a set of `Term` instances | |
is returned. | |
Attributes: | |
operator: The `Operator` instance associated with this node. | |
args: The arguments associated with this node. | |
""" | |
def __init__(self, operator: Operator, args: Iterable[Any]): | |
self.operator = operator | |
self.args = args | |
def to_terms(self) -> Iterable[Term]: | |
""" | |
Evaluate this AST node and return the resulting set of `Term` instances. | |
Note: We use topological evaluation here to avoid recursion issues for | |
long formula (exceeding ~700 terms, though this depends on the recursion | |
limit set in the interpreter). | |
""" | |
g = graphlib.TopologicalSorter(self.__generate_evaluation_graph()) | |
g.prepare() | |
results = {} | |
while g.is_active(): | |
for node in g.get_ready(): | |
node_args = ( | |
(results[arg] if isinstance(arg, ASTNode) else arg.to_terms()) | |
for arg in node.args | |
) | |
if node.operator.structural: | |
results[node] = node.operator.to_terms(*node_args) | |
else: | |
results[node] = Structured._merge( | |
*node_args, | |
merger=node.operator.to_terms, | |
) | |
g.done(node) | |
return results[self] | |
def __repr__(self): | |
try: | |
return f"<ASTNode {self.operator}: {self.args}>" | |
except RecursionError: | |
return f"<ASTNode {self.operator}: ...>" | |
def flatten(self, str_args: bool = False) -> List[Any]: | |
""" | |
Flatten this `ASTNode` instance into a list of form: [<operator>, *<args>]. | |
This is primarily useful during debugging and unit testing, since it | |
provides a human readable summary of the entire AST. | |
Args: | |
str_args: Whether to cast every element of the flattened object to | |
a string. | |
""" | |
return [ | |
str(self.operator) if str_args else self.operator, | |
*[ | |
arg.flatten(str_args=str_args) | |
if isinstance(arg, ASTNode) | |
else (str(arg) if str_args else arg) | |
for arg in self.args | |
], | |
] | |
# Helpers | |
def __generate_evaluation_graph(self) -> Dict[ASTNode, List[ASTNode]]: | |
nodes_to_parse = [self] | |
graph = {} | |
while nodes_to_parse: | |
node = nodes_to_parse.pop() | |
children = [child for child in node.args if isinstance(child, ASTNode)] | |
nodes_to_parse.extend(children) | |
graph[node] = children | |
return graph | |
from .parser import DefaultFormulaParser, DefaultOperatorResolver | |
__all__ = [ | |
"DefaultFormulaParser", | |
"DefaultOperatorResolver", | |
] | |
from .tokenize import tokenize | |
from .tokens_to_ast import tokens_to_ast | |
__all__ = [ | |
"tokenize", | |
"tokens_to_ast", | |
] | |
from collections import namedtuple | |
from typing import Iterable, Optional | |
from ..types import ASTNode, Operator, OperatorResolver, Token | |
from ..utils import exc_for_token, exc_for_missing_operator | |
OrderedOperator = namedtuple("OrderedOperator", ("operator", "token", "index")) | |
CONTEXT_OPENERS = {"(", "["} | |
CONTEXT_CLOSERS = { | |
")": "(", | |
"]": "[", | |
} | |
def tokens_to_ast( | |
tokens: Iterable[Token], operator_resolver: OperatorResolver | |
) -> Optional[ASTNode]: | |
""" | |
Convert a iterable of `Token` instances into an abstract syntax tree. | |
This implementation is intentionally as simple and abstract as possible, and | |
makes few assumptions about the form of the operators that will be present | |
in the token sequence. Instead, it relies on the `OperatorResolver` instance | |
to evaluate based on the context which operator should be invoked to handle | |
surrounding tokens based on their arity/etc. This means that changes to the | |
formula syntax (such as the addition of new operators) should not require | |
any changes to this abstract syntax tree generator. | |
The algorithm employed here is a slightly enriched [Shunting Yard | |
Algorithm](https://en.wikipedia.org/wiki/Shunting-yard_algorithm), where we | |
have added additional support for operator arities, fixities, | |
associativities, etc. | |
Args: | |
tokens: The tokens for which an abstract syntax tree should be | |
generated. | |
operator_resolver: The `OperatorResolver` instance to be used to lookup | |
operators (only the `.resolve()` method is used). | |
Returns: | |
The generated abstract syntax tree as a nested `ASTNode` instance. | |
""" | |
output_queue = [] | |
operator_stack = [] | |
def stack_operator(operator, token): | |
operator_stack.append(OrderedOperator(operator, token, len(output_queue))) | |
def operate(ordered_operator, output_queue): | |
operator, token, index = ordered_operator | |
if operator.fixity is Operator.Fixity.INFIX: | |
assert operator.arity == 2 | |
min_index = index - 1 | |
max_index = index + 1 | |
elif operator.fixity is Operator.Fixity.PREFIX: | |
min_index = index | |
max_index = index + operator.arity | |
else: # Operator.Fixity.POSTFIX | |
min_index = index - operator.arity | |
max_index = index | |
if min_index < 0 or max_index > len(output_queue): | |
raise exc_for_token( | |
token, | |
f"Operator `{token.token}` has insuffient arguments and/or is misplaced.", | |
) | |
return [ | |
*output_queue[:min_index], | |
ASTNode(operator, output_queue[min_index:max_index]), | |
*output_queue[max_index:], | |
] | |
for token in tokens: | |
if token.kind is token.Kind.CONTEXT: | |
if token.token in CONTEXT_OPENERS: | |
stack_operator(token, token) | |
elif token.token in CONTEXT_CLOSERS: | |
starting_token = CONTEXT_CLOSERS[token.token] | |
while operator_stack and operator_stack[-1].token != starting_token: | |
output_queue = operate(operator_stack.pop(), output_queue) | |
if operator_stack and operator_stack[-1].token == starting_token: | |
operator_stack.pop() | |
else: | |
raise exc_for_token( | |
token, "Could not find matching context marker." | |
) | |
else: # pragma: no cover | |
raise exc_for_token( | |
token, | |
f"Context token `{token.token}` is unrecognized.", | |
) | |
elif token.kind is token.Kind.OPERATOR: | |
max_prefix_arity = ( | |
len(output_queue) - operator_stack[-1].index | |
if operator_stack | |
else len(output_queue) | |
) | |
operators = operator_resolver.resolve( | |
token, | |
max_prefix_arity=max_prefix_arity, | |
context=[s.operator for s in operator_stack], | |
) | |
for operator in operators: | |
while ( | |
operator_stack | |
and operator_stack[-1].token.kind is not Token.Kind.CONTEXT | |
and ( | |
operator_stack[-1].operator.precedence > operator.precedence | |
or operator_stack[-1].operator.precedence == operator.precedence | |
and operator.associativity is Operator.Associativity.LEFT | |
) | |
): | |
output_queue = operate(operator_stack.pop(), output_queue) | |
stack_operator(operator, token) | |
else: | |
output_queue.append(token) | |
while operator_stack: | |
if operator_stack[-1].token.kind is Token.Kind.CONTEXT: | |
raise exc_for_token( | |
operator_stack[-1].token, "Could not find matching context marker." | |
) | |
output_queue = operate(operator_stack.pop(), output_queue) | |
if output_queue: | |
if len(output_queue) > 1: | |
raise exc_for_missing_operator(output_queue[0], output_queue[1]) | |
return output_queue[0] | |
import re | |
from typing import Iterable, Pattern | |
from ..types import Token | |
from ..utils import exc_for_token | |
def tokenize( | |
formula: str, | |
word_chars: Pattern = re.compile(r"[\.\_\w]"), | |
numeric_chars: Pattern = re.compile(r"[0-9\.]"), | |
whitespace_chars: Pattern = re.compile(r"\s"), | |
) -> Iterable[Token]: | |
""" | |
Convert a formula string into a generator of tokens. | |
This tokenizer is intentionally very simple, and it makes no attempt to | |
validate incoming tokens beyond ensuring that they are complete. The | |
rationale for this is that changes like adding support for a new operator do | |
not require changes to this tokenizer, and can instead be done entirely | |
within the higher-level parser. This simplicity also lends itself to a direct | |
functional implementation (rather than a class with methods), and so that is | |
approach taken here. | |
Tokens outputted will have one of four kinds: | |
- operator: an operator to be applied to other surrounding tokens (will | |
always consist of non-word characters). | |
- name: a name of a feature/variable to be lifted from the model matrix | |
context. | |
- value: a literal value (string/number). | |
- python: a code string to be evaluated. | |
The basic logic of this tokenizer is to loop over each character in the | |
formula string and: | |
- ensure that portions quoted by one of : ', ", {}, and ` are correctly | |
grouped into a token of the appropriate kind. | |
- ignore unquoted whitespace | |
- correctly distinguish users of (, ), [, and ] as grouping operators vs. Python | |
function calls. | |
- output each contiguous portion of the formula string that belongs to | |
the same token type as a token. (e.g. sequential operators like '+-' | |
will be output as a single operator token). | |
Args: | |
formula: The formula string to tokenize. | |
word_chars: The regex pattern used to recognize "word" characters | |
(basically non-operator characters). | |
numeric_chars: The regex pattern used to recognize numeric characters. | |
whitespace_chars: The regex pattern use to recognize (ignored) | |
whitespace characters. | |
Returns: | |
A generator over the tokens found in the formula string. | |
""" | |
quote_context = [] | |
take = 0 | |
token = Token(source=formula) | |
for i, char in enumerate(formula): | |
if take > 0: | |
token.update(char, i) | |
take -= 1 | |
continue | |
if quote_context and char == "\\": | |
token.update(char, i) | |
take = 1 | |
continue | |
if quote_context and quote_context[-1] in "}`" and char == quote_context[-1]: | |
quote_context.pop(-1) | |
if token: | |
if quote_context: | |
token.update(char, i) | |
else: | |
yield token | |
token = Token(source=formula) | |
continue | |
if quote_context and char == quote_context[-1]: | |
token.update(char, i) | |
quote_context.pop(-1) | |
if ( | |
token | |
and not quote_context | |
and token.kind is Token.Kind.PYTHON | |
and char in ("]", ")") | |
): | |
yield token | |
token = Token(source=formula) | |
continue | |
if quote_context and quote_context[-1] in ('"', "'", "`", ")", "}"): | |
if char in "(`" and quote_context[-1] in "})": | |
quote_context.append(char.replace("(", ")")) | |
token.update(char, i) | |
continue | |
if char == "{": | |
if token: | |
yield token | |
token = Token(source=formula, kind="python", source_start=i) | |
quote_context.append("}") | |
continue | |
if char == "`": | |
if token: | |
yield token | |
token = Token(source=formula, kind="name", source_start=i) | |
quote_context.append("`") | |
continue | |
if char in "([": | |
if token.kind in (Token.Kind.NAME, Token.Kind.PYTHON): | |
token.update(char, i, kind=Token.Kind.PYTHON) | |
quote_context.append(")" if char == "(" else "]") | |
else: | |
if token: | |
yield token | |
token = Token(source=formula) | |
yield Token(source=formula).update(char, i, kind="context") | |
continue | |
if char in ")]": | |
if token: | |
yield token | |
token = Token(source=formula) | |
yield Token(source=formula).update(char, i, kind="context") | |
continue | |
if whitespace_chars.match(char): | |
if token and token.kind is not Token.Kind.OPERATOR: | |
yield token | |
token = Token(source=formula) | |
continue | |
if char in ('"', "'"): | |
if token and token.kind is Token.Kind.OPERATOR: | |
yield token | |
token = Token(source=formula) | |
if not token: | |
token.update(char, i, kind="value") | |
quote_context.append(char) | |
else: | |
raise exc_for_token( | |
Token(source=formula, source_start=i, source_end=i), | |
f"Unexpected character {repr(char)} following token `{token.token}`.", | |
) | |
continue # pragma: no cover; workaround bug in coverage | |
if word_chars.match(char): | |
assert token.kind in ( | |
None, | |
Token.Kind.OPERATOR, | |
Token.Kind.VALUE, | |
Token.Kind.NAME, | |
), f"Unexpected token kind {token.kind}." | |
if token and token.kind is Token.Kind.OPERATOR: | |
yield token | |
token = Token(source=formula) | |
if numeric_chars.match(char) and token.kind in (None, Token.Kind.VALUE): | |
kind = "value" | |
else: | |
kind = "name" | |
token.update(char, i, kind=kind) | |
continue | |
if token and token.kind is not Token.Kind.OPERATOR: | |
yield token | |
token = Token(source=formula) | |
token.update(char, i, kind="operator") | |
if quote_context: | |
raise exc_for_token( | |
token, | |
message=f"Formula ended before quote context was closed. Expected: {quote_context[-1]}", | |
) | |
if token: | |
yield token | |
import ast | |
import itertools | |
import functools | |
import re | |
from dataclasses import dataclass, field | |
from typing import List, Iterable, Set, Tuple, Union | |
from .algos.tokenize import tokenize | |
from .types import ( | |
FormulaParser, | |
Operator, | |
OperatorResolver, | |
Structured, | |
Term, | |
Token, | |
) | |
from .utils import ( | |
exc_for_token, | |
insert_tokens_after, | |
merge_operator_tokens, | |
replace_tokens, | |
) | |
@dataclass | |
class DefaultFormulaParser(FormulaParser): | |
""" | |
The default parser for `Formula`s. | |
It extends `FormulaParser` by defaulting the operator resolver to | |
`DefaultOperatorResolver`, and by adding the option to enable the inclusion | |
of an intercept. | |
Attributes: | |
operator_resolver: The operator resolver to use when parsing the formula | |
string and generating the abstract syntax tree. If not specified, | |
it will default to `DefaultOperatorResolver`. | |
include_intercept: Whether to include an intercept by default | |
(formulas can still omit this intercept in the usual manner: | |
adding a '-1' or '+0' term). | |
""" | |
ZERO_PATTERN = re.compile(r"(?:^|(?<=\W))0(?=\W|$)") | |
# Attributes | |
operator_resolver: OperatorResolver = field( | |
default_factory=lambda: DefaultOperatorResolver() # pylint: disable=unnecessary-lambda | |
) | |
include_intercept: bool = True | |
def get_tokens(self, formula: str) -> Iterable[Token]: | |
""" | |
Return an iterable of `Token` instances for the nominated `formula` | |
string. | |
Args: | |
formula: The formula string to be tokenized. | |
""" | |
# Transform formula to add intercepts and replace 0 with -1. We do this | |
# as token transformations to reduce the complexity of the code, and | |
# also to avoid the ambiguity in the AST around intentionally unary vs. | |
# incidentally unary operations (e.g. "+0" vs. "x + (+0)"). This cannot | |
# easily be done as string operations because of quotations and escapes | |
# which are best left to the tokenizer. | |
token_one = Token("1", kind=Token.Kind.VALUE) | |
token_plus = Token("+", kind=Token.Kind.OPERATOR) | |
token_minus = Token("-", kind=Token.Kind.OPERATOR) | |
tokens = tokenize(formula) | |
# Substitute "0" with "-1" | |
tokens = replace_tokens( | |
tokens, "0", [token_minus, token_one], kind=Token.Kind.VALUE | |
) | |
# Insert intercepts | |
if self.include_intercept: | |
tokens = list( | |
insert_tokens_after( | |
tokens, | |
"~", | |
[token_one], | |
kind=Token.Kind.OPERATOR, | |
join_operator="+", | |
) | |
) | |
rhs_index = ( | |
max( | |
(i for i, token in enumerate(tokens) if token.token.endswith("~")), | |
default=-1, | |
) | |
+ 1 | |
) | |
tokens = [ | |
*( | |
tokens[:rhs_index] | |
if rhs_index > 0 | |
else ([token_one, token_plus] if len(tokens) > 0 else [token_one]) | |
), | |
*insert_tokens_after( | |
tokens[rhs_index:], | |
r"\|", | |
[token_one], | |
kind=Token.Kind.OPERATOR, | |
join_operator="+", | |
), | |
] | |
# Collapse inserted "+" and "-" operators to prevent unary issues. | |
tokens = merge_operator_tokens(tokens, symbols={"+", "-"}) | |
return tokens | |
class DefaultOperatorResolver(OperatorResolver): | |
""" | |
The default operator resolver implementation. | |
This class implements the standard operators in a form consistent with | |
other implementations of Wilkinson formulas. It can be extended via | |
subclassing to support other kinds of operators, in which case `.operators` | |
and/or `.resolve` can be overridden. For more details about which operators | |
are implemented, review the code or the documentation website. | |
""" | |
@property | |
def operators(self): | |
def formula_part_expansion( | |
lhs: Set[Term], rhs: Set[Term] | |
) -> Tuple[Set[Term], Set[Term]]: | |
terms = (lhs, rhs) | |
out = [] | |
for termset in terms: | |
if isinstance(termset, tuple): | |
out.extend(termset) | |
else: | |
out.append(termset) | |
return tuple(out) | |
def nested_product_expansion( | |
parents: Set[Term], nested: Set[Term] | |
) -> Set[Term]: | |
common = functools.reduce(lambda x, y: x * y, parents) | |
return parents.union({common * term for term in nested}) | |
def power(arg: Set[Term], power: Set[Term]) -> Set[Term]: | |
power_term = next(iter(power)) | |
if ( | |
not len(power_term.factors) == 1 | |
or power_term.factors[0].token.kind is not Token.Kind.VALUE | |
or not isinstance(ast.literal_eval(power_term.factors[0].expr), int) | |
): | |
raise exc_for_token( | |
power_term.factors[0].token, | |
"The right-hand argument of `**` must be a positive integer.", | |
) | |
return { | |
functools.reduce(lambda x, y: x * y, term) | |
for term in itertools.product(*[arg] * int(power_term.factors[0].expr)) | |
} | |
return [ | |
Operator( | |
"~", | |
arity=2, | |
precedence=-100, | |
associativity=None, | |
to_terms=lambda lhs, rhs: Structured(lhs=lhs, rhs=rhs), | |
accepts_context=lambda context: len(context) == 0, | |
structural=True, | |
), | |
Operator( | |
"~", | |
arity=1, | |
precedence=-100, | |
associativity=None, | |
fixity="prefix", | |
to_terms=lambda terms: terms, | |
accepts_context=lambda context: len(context) == 0, | |
structural=True, | |
), | |
Operator( | |
"|", | |
arity=2, | |
precedence=-50, | |
associativity=None, | |
to_terms=formula_part_expansion, | |
accepts_context=lambda context: all( | |
isinstance(c, Operator) and c.symbol in "~|" for c in context | |
), | |
structural=True, | |
), | |
Operator( | |
"+", | |
arity=2, | |
precedence=100, | |
associativity="left", | |
to_terms=lambda lhs, rhs: lhs.union(rhs), | |
), | |
Operator( | |
"-", | |
arity=2, | |
precedence=100, | |
associativity="left", | |
to_terms=lambda left, right: left.difference(right), | |
), | |
Operator( | |
"+", | |
arity=1, | |
precedence=100, | |
associativity="right", | |
fixity="prefix", | |
to_terms=lambda terms: terms, | |
), | |
Operator( | |
"-", | |
arity=1, | |
precedence=100, | |
associativity="right", | |
fixity="prefix", | |
to_terms=lambda terms: set(), | |
), | |
Operator( | |
"*", | |
arity=2, | |
precedence=200, | |
associativity="left", | |
to_terms=lambda *term_sets: ( | |
{ | |
functools.reduce(lambda x, y: x * y, term) | |
for term in itertools.product(*term_sets) | |
}.union(itertools.chain(*term_sets)) | |
), | |
), | |
Operator( | |
"/", | |
arity=2, | |
precedence=200, | |
associativity="left", | |
to_terms=nested_product_expansion, | |
), | |
Operator( | |
":", | |
arity=2, | |
precedence=300, | |
associativity="left", | |
to_terms=lambda *term_sets: { | |
functools.reduce(lambda x, y: x * y, term) | |
for term in itertools.product(*term_sets) | |
}, | |
), | |
Operator( | |
"**", arity=2, precedence=500, associativity="right", to_terms=power | |
), | |
] | |
def resolve( | |
self, token: Token, max_prefix_arity: int, context: List[Union[Token, Operator]] | |
) -> Iterable[Operator]: | |
if token.token in self.operator_table: | |
return super().resolve(token, max_prefix_arity, context) | |
symbol = token.token | |
# Keep track the number of "+" and "-" characters; if an odd number "-" | |
# than "-", else "+" | |
while True: | |
m = re.search(r"[+\-]{2,}", symbol) | |
if not m: | |
break | |
symbol = ( | |
symbol[: m.start(0)] + "-" | |
if len(m.group(0).replace("+", "")) % 2 | |
else "+" + symbol[m.end(0) :] | |
) | |
if symbol in self.operator_table: | |
return [self._resolve(token, symbol, max_prefix_arity, context)] | |
return [ | |
self._resolve(token, sym, max_prefix_arity if i == 0 else 0, context) | |
for i, sym in enumerate(symbol) | |
] | |
import re | |
from typing import Iterable, Optional, Sequence, Set, Tuple, Type, Union | |
from formulaic.errors import FormulaSyntaxError | |
from .types.ast_node import ASTNode | |
from .types.token import Token | |
# Exception handling | |
def exc_for_token( | |
token: Union[Token, ASTNode], | |
message: str, | |
errcls: Type[Exception] = FormulaSyntaxError, | |
) -> Exception: | |
""" | |
Return an exception ready to be raised with a helpful token/source context. | |
Args: | |
token: The `Token` or `ASTNode` instance about which an exception should | |
be raised. | |
message: The message to be included in the exception. | |
errcls: The type of the exception to be returned. | |
""" | |
token = __get_token_for_ast(token) | |
token_context = token.get_source_context(colorize=True) | |
if token_context: | |
return errcls(f"{message}\n\n{token_context}") | |
return errcls(message) | |
def exc_for_missing_operator( | |
lhs: Union[Token, ASTNode], | |
rhs: Union[Token, ASTNode], | |
errcls: Type[Exception] = FormulaSyntaxError, | |
) -> Exception: | |
""" | |
Return an exception ready to be raised about a missing operator token | |
between the `lhs` and `rhs` tokens/ast-nodes. | |
Args: | |
lhs: The `Token` or `ASTNode` instance to the left of where an operator | |
should be placed. | |
rhs: The `Token` or `ASTNode` instance to the right of where an operator | |
should be placed. | |
errcls: The type of the exception to be returned. | |
""" | |
lhs_token, rhs_token, error_token = __get_tokens_for_gap(lhs, rhs) | |
return exc_for_token( | |
error_token, | |
f"Missing operator between `{lhs_token.token}` and `{rhs_token.token}`.", | |
errcls=errcls, | |
) | |
def __get_token_for_ast(ast: Union[Token, ASTNode]) -> Token: # pragma: no cover | |
""" | |
Ensure that incoming `ast` is a `Token`, or else generate one for debugging | |
purposes (note that this token will not be valid `Token` for use other than | |
in reporting errors). | |
""" | |
if isinstance(ast, Token): | |
return ast | |
lhs_token = ast | |
while isinstance(lhs_token, ASTNode): | |
lhs_token = lhs_token.args[0] | |
rhs_token = ast | |
while isinstance(rhs_token, ASTNode): | |
rhs_token = rhs_token.args[-1] | |
return Token( | |
token=lhs_token.source[lhs_token.source_start : rhs_token.source_end + 1] | |
if lhs_token.source | |
else "", | |
source=lhs_token.source, | |
source_start=lhs_token.source_start, | |
source_end=rhs_token.source_end, | |
) | |
def __get_tokens_for_gap( | |
lhs: Union[Token, ASTNode], rhs: Union[Token, ASTNode] | |
) -> Tuple[Token, Token, Token]: | |
""" | |
Ensure that incoming `lhs` and `rhs` objects are `Token`s, or else generate | |
some for debugging purposes (note that these tokens will not be valid | |
`Token`s for use other than in reporting errors). Three tokens will be | |
returned: the left-hand side token, the right-hand-side token, and the | |
"middle" token where a new operator/token should be inserted (may not | |
be empty depending on context). | |
""" | |
lhs_token = lhs | |
while isinstance(lhs_token, ASTNode): | |
lhs_token = lhs_token.args[-1] | |
rhs_token = rhs or lhs | |
while isinstance(rhs_token, ASTNode): | |
rhs_token = rhs_token.args[0] | |
return ( | |
lhs_token, | |
rhs_token, | |
Token( | |
lhs_token.source[lhs_token.source_start : rhs_token.source_end + 1] | |
if lhs_token.source | |
else "", | |
source=lhs_token.source, | |
source_start=lhs_token.source_start, | |
source_end=rhs_token.source_end, | |
), | |
) | |
# Token sequence mutations | |
def replace_tokens( | |
tokens: Iterable[Token], | |
token_to_replace: str, | |
replacement: Union[Token, Sequence[Token]], | |
*, | |
kind: Optional[Token.Kind] = None, | |
) -> Iterable[Token]: | |
""" | |
Replace any token in the `tokens` sequence with one or more replacement | |
tokens. | |
Args: | |
tokens: The sequence of tokens within which tokens should be replaced. | |
token_to_replace: The string representation of the token to replace. | |
replacement: The replacement token(s) to insert into the `tokens` | |
sequence. | |
kind: The type of tokens to be replaced. If not specified, all | |
tokens which match the provided `token_to_match` string will be | |
replaced. | |
""" | |
for token in tokens: | |
if kind and token.kind is not kind or token.token != token_to_replace: | |
yield token | |
else: | |
if isinstance(replacement, Token): | |
yield replacement | |
else: | |
yield from replacement | |
def insert_tokens_after( | |
tokens: Iterable[Token], | |
pattern: Union[str, re.Pattern], | |
tokens_to_add: Sequence[Token], | |
*, | |
kind: Optional[Token.Kind] = None, | |
join_operator: Optional[str] = None, | |
) -> Iterable[Token]: | |
""" | |
Insert additional tokens into a sequence of tokens after (within token) | |
pattern matches. | |
Note: this insertion can happen in the *middle* of existing tokens, which is | |
especially useful when inserting tokens around multiple operators (which are | |
often merged together into a single token). If you want to avoid this, make | |
sure your regex `pattern` includes start and end matchers; e.g. | |
`^<pattern>$`. | |
Args: | |
tokens: The sequence of tokens within which tokens should be replaced. | |
pattern: A (potentially compiled) regex expression indicating where | |
tokens should be inserted. | |
tokens_to_add: A sequence of tokens to be inserted wherever `pattern` | |
matches. | |
kind: The type of tokens to be considered for insertion. If not | |
specified, any matching token (part) will result in insertions. | |
join_operator: If the insertion of tokens would result the joining of | |
the added tokens with existing tokens, the value set here will be | |
used to create a joining operator token. If not provided, not | |
additional operators are added. | |
""" | |
if not isinstance(pattern, re.Pattern): | |
pattern = re.compile(pattern) | |
if join_operator: | |
tokens = list(tokens) | |
for i, token in enumerate(tokens): | |
if ( | |
kind is not None | |
and token.kind is not kind | |
or not pattern.search(token.token) | |
): | |
yield token | |
continue | |
split_tokens = list(token.split(pattern, after=True)) | |
for j, split_token in enumerate(split_tokens): | |
yield split_token | |
m = pattern.search(split_token.token) | |
if m and m.span()[1] == len(split_token.token): | |
yield from tokens_to_add | |
if join_operator: | |
next_token = None | |
if j < len(split_tokens) - 1: | |
next_token = split_tokens[j + 1] | |
elif i < len(tokens) - 1: | |
next_token = tokens[i + 1] | |
if ( | |
next_token is not None | |
and next_token.kind is not Token.Kind.OPERATOR | |
): | |
yield Token(join_operator, kind=Token.Kind.OPERATOR) | |
def merge_operator_tokens( | |
tokens: Iterable[Token], symbols: Optional[Set[str]] = None | |
) -> Iterable[Token]: | |
""" | |
Merge operator tokens within a sequence of tokens. | |
This is useful if you have added operator tokens after tokenization, in | |
order to allow operator resolution of (e.g.) adjacent `+` and `-` operators. | |
Args: | |
tokens: The sequence of tokens within which tokens should be replaced. | |
symbols: If specified, only adjacent operator symbols appearing within | |
this set will be merged. | |
""" | |
pooled_token = None | |
for token in tokens: | |
if ( | |
token.kind is not Token.Kind.OPERATOR | |
or symbols | |
and token.token[0] not in symbols | |
): | |
if pooled_token: | |
yield pooled_token | |
pooled_token = None | |
yield token | |
continue | |
# `token` is an operator that can be collapsed on the left | |
if pooled_token: | |
pooled_token = token.copy_with_attrs(token=pooled_token.token + token.token) | |
if symbols and not pooled_token.token[-1] in symbols: | |
yield pooled_token | |
pooled_token = None | |
continue | |
pooled_token = token | |
if pooled_token: | |
yield pooled_token | |
from typing import Any, Mapping, Union | |
from .formula import FormulaSpec | |
from .model_matrix import ModelMatrices, ModelMatrix | |
from .model_spec import ModelSpec, ModelSpecs | |
from .utils.context import capture_context | |
def model_matrix( | |
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs], | |
data: Any, | |
*, | |
context: Union[int, Mapping[str, Any]] = 0, | |
**spec_overrides, | |
) -> Union[ModelMatrix, ModelMatrices]: | |
""" | |
Generate a model matrix directly from a formula or model spec. | |
This method is syntactic sugar for: | |
``` | |
Formula(spec).get_model_matrix(data, context=LayeredMapping(locals(), globals()), **kwargs) | |
``` | |
or | |
``` | |
model_spec.get_model_matrix(data, context=LayeredMapping(locals(), globals()), **kwargs) | |
``` | |
Args: | |
spec: The spec that describes the structure of the model matrix to be | |
generated. This can be either a `ModelMatrix` or `ModelSpec` | |
instance (in which case the structure and state associated with the | |
`ModelSpec` instance is re-used), or a formula specification or | |
instance (in which case the structure is built from scratch). | |
data: The raw data to be transformed into a model matrix. This can be | |
any of the supported data types, but is typically a | |
`pandas.DataFrame` instance. | |
context: The context from which variables (and custom transforms/etc) | |
should be inherited. When specified as an integer, it is interpreted | |
as a frame offset from the caller's frame (i.e. 0, the default, | |
means that all variables in the caller's scope should be made | |
accessible when interpreting and evaluating formulae). Otherwise, a | |
mapping from variable name to value is expected. | |
spec_overrides: Any `ModelSpec` attributes to set/override. See | |
`ModelSpec` for more details. | |
Returns: | |
The data transformed in to the model matrix with the requested | |
nominated structure. | |
""" | |
if isinstance(context, int): | |
context = capture_context(context + 1) | |
return ModelSpec.from_spec(spec, **spec_overrides).get_model_matrix( | |
data, context=context | |
) | |
from __future__ import annotations | |
from abc import abstractmethod | |
import inspect | |
import warnings | |
from numbers import Number | |
from typing import Any, Union, Dict, Iterable, List, Optional, TYPE_CHECKING | |
import numpy | |
import pandas | |
import scipy.sparse as spsparse | |
import scipy.sparse.linalg | |
from interface_meta import InterfaceMeta | |
from formulaic.errors import DataMismatchWarning | |
from formulaic.materializers.types import FactorValues | |
from formulaic.utils.sparse import categorical_encode_series_to_sparse_csc_matrix | |
from formulaic.utils.stateful_transforms import stateful_transform | |
from .poly import poly | |
if TYPE_CHECKING: | |
from formulaic.model_spec import ModelSpec # pragma: no cover | |
def C( | |
data: Any, | |
contrasts: Optional[ | |
Union[Contrasts, Dict[str, Iterable[Number]], numpy.ndarray] | |
] = None, | |
*, | |
levels: Optional[Iterable[str]] = None, | |
): | |
""" | |
Mark data as being categorical, and optionally specify the contrasts to be | |
used during encoding. | |
Args: | |
data: The data to be marked as categorical. | |
contrasts: The specification of the contrasts that are to be computed. | |
Should be a `Contrasts` instance, a dictionary mapping a key for | |
the contrast with a vector of weights for the categories, or a | |
numpy array with columns representing the contrasts, and rows | |
representing the weights over the categories in the data. If not | |
specified, a `Treatment` encoding is assumed. | |
levels: The categorical levels associated with `data`. If not present, | |
levels are inferred from `data`. Note that extra levels in `data` | |
will be treated as null data. | |
""" | |
def encoder( | |
values: Any, | |
reduced_rank: bool, | |
drop_rows: List[int], | |
encoder_state: Dict[str, Any], | |
model_spec: ModelSpec, | |
): | |
values = pandas.Series(values) | |
values = values.drop(index=values.index[drop_rows]) | |
return encode_contrasts( | |
values, | |
contrasts=contrasts, | |
levels=levels, | |
reduced_rank=reduced_rank, | |
_state=encoder_state, | |
_spec=model_spec, | |
) | |
return FactorValues( | |
data, | |
kind="categorical", | |
spans_intercept=True, | |
encoder=encoder, | |
) | |
@stateful_transform | |
def encode_contrasts( | |
data, | |
contrasts: Union[ | |
Contrasts, Dict[str, Iterable[Number]], numpy.ndarray, None | |
] = None, | |
*, | |
levels: Optional[Iterable[str]] = None, | |
reduced_rank: bool = False, | |
output: Optional[str] = None, | |
_state=None, | |
_spec=None, | |
) -> FactorValues[Union[pandas.DataFrame, spsparse.spmatrix]]: | |
""" | |
Encode a categorical dataset into one or more "contrasts". | |
Args: | |
data: The categorical data array/series to be encoded. | |
contrasts: The specification of the contrasts that are to be computed. | |
Should be a `Contrasts` instance, a dictionary mapping a key for | |
the contrast with a vector of weights for the categories, or a | |
numpy array with columns representing the contrasts, and rows | |
representing the weights over the categories in the data. If not | |
specified, a `Treatment` encoding is assumed. | |
levels: The complete set of levels (categories) posited to be present in | |
the data. This can also be used to reorder the levels as needed. | |
reduced_rank: Whether to reduce the rank of output encoded columns in | |
order to avoid spanning the intercept. | |
output: The type of data to output. Must be one of "pandas", "numpy", or | |
"sparse". | |
""" | |
# Prepare arguments | |
output = output or _spec.output or "pandas" | |
levels = levels or _state.get( | |
"categories" | |
) # TODO: Is this too early to provide useful feedback to users? | |
if contrasts is None: | |
contrasts = TreatmentContrasts() | |
elif inspect.isclass(contrasts) and issubclass(contrasts, Contrasts): | |
contrasts = contrasts() | |
if not isinstance(contrasts, Contrasts): | |
contrasts = CustomContrasts(contrasts) | |
if levels is not None: | |
extra_categories = set(pandas.unique(data)).difference(levels) | |
if extra_categories: | |
warnings.warn( | |
"Data has categories outside of the nominated levels (or that were " | |
f"not seen in original dataset): {extra_categories}. They are being " | |
" cast to nan, which will likely skew the results of your analyses.", | |
DataMismatchWarning, | |
) | |
data = pandas.Series(pandas.Categorical(data, categories=levels)) | |
else: | |
data = pandas.Series(data).astype("category") | |
# Perform dummy encoding | |
if output in ("pandas", "numpy"): | |
categories = list(data.cat.categories) | |
encoded = pandas.get_dummies(data) | |
elif output == "sparse": | |
categories, encoded = categorical_encode_series_to_sparse_csc_matrix( | |
data, | |
) | |
else: | |
raise ValueError(f"Unknown output type `{repr(output)}`.") | |
# Update state | |
_state["categories"] = categories | |
# Apply and return contrasts | |
return contrasts.apply( | |
encoded, levels=categories, reduced_rank=reduced_rank, output=output | |
) | |
class Contrasts(metaclass=InterfaceMeta): | |
""" | |
The base class for all contrast implementations. | |
""" | |
INTERFACE_RAISE_ON_VIOLATION = True | |
FACTOR_FORMAT = "{name}[{field}]" | |
def apply( | |
self, | |
dummies, | |
levels, | |
reduced_rank=True, | |
output: Optional[str] = None, | |
): | |
""" | |
Apply the contrasts defined by this `Contrasts` instance to `dummies` | |
(the dummy encoding of the values of interest). | |
Args: | |
dummies: Dummy encoded representation of the values. | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output a reduced rank matrix. When this is | |
`False`, the dummy encoding is usually passed through | |
unmodified. | |
output: The type of datastructure to output. Should be one of: | |
"pandas", "numpy", "sparse", or `None`. If `None` is provided, | |
the output type will be inferred from the input data type. | |
""" | |
if output is None: | |
if isinstance(dummies, pandas.DataFrame): | |
output = "pandas" | |
elif isinstance(dummies, numpy.ndarray): | |
output = "numpy" | |
elif isinstance(dummies, spsparse.spmatrix): | |
output = "sparse" | |
else: # pragma: no cover | |
raise ValueError( | |
f"Cannot impute output type for dummies of type `{type(dummies)}`." | |
) | |
elif output not in ("pandas", "numpy", "sparse"): # pragma: no cover | |
raise ValueError( | |
"Output type for contrasts must be one of: 'pandas', 'numpy' or 'sparse'." | |
) | |
sparse = output == "sparse" | |
encoded = self._apply( | |
dummies, levels=levels, reduced_rank=reduced_rank, sparse=sparse | |
) | |
coding_column_names = self.get_coding_column_names( | |
levels, reduced_rank=reduced_rank | |
) | |
if output == "pandas": | |
encoded = pandas.DataFrame( | |
encoded, | |
columns=coding_column_names, | |
) | |
elif output == "numpy": | |
encoded = numpy.array(encoded) | |
return FactorValues( | |
encoded, | |
kind="categorical", | |
column_names=coding_column_names, | |
spans_intercept=self.get_spans_intercept(levels, reduced_rank=reduced_rank), | |
drop_field=self.get_drop_field(levels, reduced_rank=reduced_rank), | |
format=self.get_factor_format(levels, reduced_rank=reduced_rank), | |
encoded=True, | |
) | |
def _apply(self, dummies, levels, reduced_rank=True, sparse=False): | |
coding_matrix = self.get_coding_matrix(levels, reduced_rank, sparse=sparse) | |
return (dummies if sparse else dummies.values) @ coding_matrix | |
# Coding matrix methods | |
def get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
""" | |
Generate the coding matrix; i.e. the matrix with column vectors | |
representing the encoding to use for the corresponding level. | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output a reduced rank matrix. When this is | |
`False`, the dummy encoding is usually passed through | |
unmodified. | |
sparse: Whether to output sparse results. | |
""" | |
coding_matrix = self._get_coding_matrix( | |
levels, reduced_rank=reduced_rank, sparse=sparse | |
) | |
if sparse: | |
return coding_matrix | |
return pandas.DataFrame( | |
coding_matrix, | |
columns=self.get_coding_column_names(levels, reduced_rank=reduced_rank), | |
index=levels, | |
) | |
@abstractmethod | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
""" | |
Subclasses must override this method to implement the generation of the | |
coding matrix. | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output the reduced rank coding matrix. | |
sparse: Whether to output sparse results. | |
""" | |
@abstractmethod | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
""" | |
Generate the names for the columns of the coding matrix (the encoded | |
features to be added to the model matrix). | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output the coefficients for reduced rank | |
encodings. | |
""" | |
# Coefficient matrix methods | |
def get_coefficient_matrix(self, levels, reduced_rank=True, sparse=False): | |
""" | |
Generate the coefficient matrix; i.e. the matrix with rows representing | |
the contrasts effectively computed during a regression, with columns | |
indicating the weights given to the origin categories. This is primarily | |
used for debugging/introspection. | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output the coefficients for reduced rank | |
encodings. | |
sparse: Whether to output sparse results. | |
""" | |
coefficient_matrix = self._get_coefficient_matrix( | |
levels, reduced_rank=reduced_rank, sparse=sparse | |
) | |
if sparse: | |
return coefficient_matrix | |
return pandas.DataFrame( | |
coefficient_matrix, | |
columns=levels, | |
index=self.get_coefficient_row_names(levels, reduced_rank=reduced_rank), | |
) | |
def _get_coefficient_matrix(self, levels, reduced_rank=True, sparse=False): | |
coding_matrix = self.get_coding_matrix( | |
levels, reduced_rank=reduced_rank, sparse=sparse | |
) | |
if reduced_rank: | |
coding_matrix = (spsparse if sparse else numpy).hstack( | |
[ | |
numpy.ones((len(levels), 1)), | |
coding_matrix, | |
] | |
) | |
if sparse: | |
return scipy.sparse.linalg.inv(coding_matrix.tocsc()) | |
return numpy.linalg.inv(coding_matrix) | |
@abstractmethod | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
""" | |
Generate the names for the rows of the coefficient matrix (the | |
interpretation of the contrasts generated by the coding matrix). | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether to output the coefficients for reduced rank | |
encodings. | |
""" | |
# Additional metadata | |
def get_spans_intercept(self, levels, reduced_rank=True) -> bool: | |
""" | |
Determine whether the encoded contrasts span the intercept. | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether the contrast encoding used had reduced rank. | |
""" | |
return not reduced_rank | |
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]: | |
""" | |
Determine which column to drop to be full rank after this encoding. | |
If this contrast encoding is already reduced in rank, then this method | |
should return `None`. | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether the contrast encoding used had reduced rank. | |
""" | |
if reduced_rank: | |
return None | |
return self.get_coding_column_names(levels, reduced_rank=reduced_rank)[0] | |
def get_factor_format(self, levels, reduced_rank=True): | |
""" | |
The format to use when assigning feature names to each encoded feature. | |
Formats can use two named substitutions: `name` and `field`; for | |
example: "{name}[{field}]". | |
Args: | |
levels: The names of the levels/categories in the data. | |
reduced_rank: Whether the contrast encoding used had reduced rank. | |
""" | |
return self.FACTOR_FORMAT | |
class TreatmentContrasts(Contrasts): | |
""" | |
Treatment (aka. dummy) coding. | |
This contrast leads to comparisons of the mean of the dependent variable for | |
each level with some reference level. If not specified, the reference level | |
is taken to be the first level. | |
""" | |
FACTOR_FORMAT = "{name}[T.{field}]" | |
MISSING = object() | |
def __init__(self, base=MISSING): | |
self.base = base | |
@Contrasts.override | |
def _apply(self, dummies, levels, reduced_rank=True, sparse=False): | |
if reduced_rank: | |
drop_index = self._find_base_index(levels) | |
mask = numpy.ones(len(levels), dtype=bool) | |
mask[drop_index] = False | |
return ( | |
dummies | |
if sparse or isinstance(dummies, numpy.ndarray) | |
else dummies.iloc | |
)[:, mask] | |
return dummies | |
def _find_base_index(self, levels): | |
if self.base is self.MISSING: | |
return 0 | |
try: | |
return levels.index(self.base) | |
except ValueError as e: | |
raise ValueError( | |
f"Value `{repr(self.base)}` for `TreatmentContrasts.base` is not among the provided levels." | |
) from e | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
n = len(levels) | |
if sparse: | |
matrix = spsparse.eye(n).tocsc() | |
else: | |
matrix = numpy.eye(n) | |
if reduced_rank: | |
drop_level = self._find_base_index(levels) | |
matrix = matrix[:, [i for i in range(matrix.shape[1]) if i != drop_level]] | |
return matrix | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
base_index = self._find_base_index(levels) | |
if reduced_rank: | |
return [level for i, level in enumerate(levels) if i != base_index] | |
return levels | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
base = levels[self._find_base_index(levels)] | |
if reduced_rank: | |
return [base, *(f"{level}-{base}" for level in levels if level != base)] | |
return levels | |
@Contrasts.override | |
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]: | |
if reduced_rank: | |
return None | |
return self.base if self.base is not self.MISSING else levels[0] | |
class SASContrasts(TreatmentContrasts): | |
""" | |
SAS (treatment) contrast coding. | |
This contrasts generated by this class are the same as | |
`TreatmentContrasts`, but with the reference level defaulting to the last | |
level (the default in SAS). | |
""" | |
@TreatmentContrasts.override | |
def _find_base_index(self, levels): | |
if self.base is self.MISSING: | |
return len(levels) - 1 | |
try: | |
return levels.index(self.base) | |
except ValueError as e: | |
raise ValueError( | |
f"Value `{repr(self.base)}` for `SASContrasts.base` is not among the provided levels." | |
) from e | |
@TreatmentContrasts.override | |
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]: | |
if reduced_rank: | |
return None | |
return self.base if self.base is not self.MISSING else levels[-1] | |
class SumContrasts(Contrasts): | |
""" | |
Sum (or Deviation) coding. | |
These contrasts compare the mean of the dependent variable for each level | |
(except the last, which is redundant) to the global average of all levels. | |
""" | |
FACTOR_FORMAT = "{name}[S.{field}]" | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
n = len(levels) | |
if not reduced_rank: | |
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n) | |
contr = spsparse.eye(n, n - 1).tolil() if sparse else numpy.eye(n, n - 1) | |
contr[-1, :] = -1 | |
return contr.tocsc() if sparse else contr | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return levels[:-1] | |
return levels | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return ["avg", *(f"{level} - avg" for level in levels[:-1])] | |
return levels | |
class HelmertContrasts(Contrasts): | |
""" | |
Helmert coding. | |
These contrasts compare the mean of the dependent variable for each | |
successive level to the average all previous levels. The default | |
attribute values are chosen to match the R implementation, which | |
corresponds to a reversed and unscaled Helmert coding. | |
Attributes: | |
reverse: Whether to iterate over successive levels in reverse order. | |
scale: Whether to scale the encoding to simplify interpretation of | |
coefficients (results in a floating point model matrix instead of an | |
integer one). | |
""" | |
FACTOR_FORMAT = "{name}[H.{field}]" | |
def __init__(self, *, reverse: bool = True, scale: bool = False): | |
self.reverse = reverse | |
self.scale = scale | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
n = len(levels) | |
if not reduced_rank: | |
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n) | |
contr = spsparse.lil_matrix((n, n - 1)) if sparse else numpy.zeros((n, n - 1)) | |
for i in range(len(levels) - 1): | |
if self.reverse: | |
contr[i + 1, i] = i + 1 | |
else: | |
contr[i, i] = n - i - 1 | |
contr[ | |
numpy.triu_indices(n - 1) if self.reverse else numpy.tril_indices(n, k=-1) | |
] = -1 | |
if self.scale: | |
for i in range(n - 1): | |
contr[:, i] /= i + 2 if self.reverse else n - i | |
return contr | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return levels[1:] if self.reverse else levels[:-1] | |
return levels | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return [ | |
"avg", | |
*( | |
f"{level} - rolling_avg" | |
for level in (levels[1:] if self.reverse else levels[:-1]) | |
), | |
] | |
return levels | |
class DiffContrasts(Contrasts): | |
""" | |
Difference coding. | |
These contrasts compare the mean of the dependent variable for each level | |
with that of the previous level. The default attribute values are chosen to | |
match the R implemention, and correspond to a reverse (or backward) | |
difference coding. | |
Attributes: | |
backward: Whether to reverse the sign of the difference (e.g. Level 2 - | |
Level 1 cf. Level 1 - Level 2). | |
""" | |
FACTOR_FORMAT = "{name}[D.{field}]" | |
def __init__(self, backward: bool = True): | |
self.backward = backward | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
n = len(levels) | |
if not reduced_rank: | |
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n) | |
contr = numpy.repeat([numpy.arange(1, n)], n, axis=0) / n | |
contr[numpy.triu_indices(n, m=n - 1)] -= 1 | |
if not self.backward: | |
contr *= -1 | |
if sparse: | |
return spsparse.csc_matrix(contr) | |
return contr | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return levels[1:] if self.backward else levels[:-1] | |
return levels | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return [ | |
"avg", | |
*( | |
f"{level} - {ref}" | |
for level, ref in ( | |
zip(levels[1:], levels) | |
if self.backward | |
else zip(levels, levels[1:]) | |
) | |
), | |
] | |
return levels | |
class PolyContrasts(Contrasts): | |
""" | |
(Orthogonal) Polynomial coding. | |
These "contrasts" represent a categorical variable that is assumed to have | |
equal (or known) spacing/scores, and allow us to model non-linear polynomial | |
behaviour of the dependent variable with respect to the ordered levels. | |
Attributes: | |
scores: The "scores" of the categorical variable. If provided, it must | |
have the same cardinality as the categories being coded. | |
""" | |
FACTOR_FORMAT = "{name}{field}" | |
NAME_ALIASES = { | |
1: ".L", | |
2: ".Q", | |
3: ".C", | |
} | |
def __init__(self, scores=None): | |
self.scores = scores | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
n = len(levels) | |
if not reduced_rank: | |
return spsparse.eye(n).tocsc() if sparse else numpy.eye(n) | |
if self.scores and not len(self.scores) == n: | |
raise ValueError( | |
"`PolyContrasts.scores` must have the same cardinality as the categories." | |
) | |
scores = self.scores or numpy.arange(n) | |
coding_matrix = poly(scores, degree=n - 1) | |
if sparse: | |
return spsparse.csc_matrix(coding_matrix) | |
return coding_matrix | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return [ | |
self.NAME_ALIASES[d] if d in self.NAME_ALIASES else f"^{d}" | |
for d in range(1, len(levels)) | |
] | |
return levels | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
if reduced_rank: | |
return ["avg", *self.get_coding_column_names(levels, reduced_rank=True)] | |
return levels | |
class CustomContrasts(Contrasts): | |
""" | |
Handle the custom contrast case when users pass in hand-coded contrast | |
matrices. | |
""" | |
def __init__(self, contrasts, names=None): | |
if isinstance(contrasts, dict): | |
if names is None: | |
names = list(contrasts) | |
contrasts = numpy.array([*contrasts.values()]).T | |
else: | |
contrasts = numpy.array(contrasts) | |
if names is not None and len(names) != contrasts.shape[1]: | |
raise ValueError( | |
"Names must be aligned with the columns of the contrast array." | |
) | |
self.contrasts = contrasts | |
self.contrast_names = names | |
@Contrasts.override | |
def _get_coding_matrix(self, levels, reduced_rank=True, sparse=False): | |
if sparse: | |
return spsparse.csc_matrix(self.contrasts) | |
return self.contrasts | |
@Contrasts.override | |
def get_coding_column_names(self, levels, reduced_rank=True): | |
if self.contrast_names: | |
return self.contrast_names | |
return list(range(1, self.contrasts.shape[1] + 1)) | |
@Contrasts.override | |
def get_coefficient_row_names(self, levels, reduced_rank=True): | |
return list(range(1, len(levels) + (0 if not reduced_rank else 1))) | |
@Contrasts.override | |
def get_spans_intercept(self, levels, reduced_rank=True) -> bool: | |
return False | |
@Contrasts.override | |
def get_drop_field(self, levels, reduced_rank=True) -> Union[int, str]: | |
return None | |
class ContrastsRegistry(type): | |
""" | |
The contrast registry, which is exposed in formulae as "contr". | |
""" | |
# Same as R | |
helmert = HelmertContrasts | |
poly = PolyContrasts | |
sum = SumContrasts | |
treatment = TreatmentContrasts | |
SAS = SASContrasts | |
# Extra | |
diff = DiffContrasts | |
custom = CustomContrasts | |
from collections import defaultdict | |
from enum import Enum | |
from typing import Iterable, Optional, Union | |
import numpy | |
import pandas | |
from formulaic.materializers.types import FactorValues | |
from formulaic.utils.stateful_transforms import stateful_transform | |
class SplineExtrapolation(Enum): | |
""" | |
Specification for how extrapolation should be performed during spline | |
computations. | |
""" | |
RAISE = "raise" | |
CLIP = "clip" | |
NA = "na" | |
ZERO = "zero" | |
EXTEND = "extend" | |
@stateful_transform | |
def basis_spline( | |
x: Union[pandas.Series, numpy.ndarray], | |
df: Optional[int] = None, | |
knots: Optional[Iterable[float]] = None, | |
degree: int = 3, | |
include_intercept: bool = False, | |
lower_bound: Optional[float] = None, | |
upper_bound: Optional[float] = None, | |
extrapolation: Union[str, SplineExtrapolation] = "raise", | |
_state: dict = None, | |
) -> FactorValues[dict]: | |
""" | |
Evaluates the B-Spline basis vectors for given inputs `x`. | |
This is especially useful in the context of allowing non-linear fits to data | |
in linear regression. Except for the addition of the `extrapolation` | |
parameter, this implementation shares its API with `patsy.splines.bs`, and | |
should behave identically to both `patsy.splines.bs` and R's `splines::bs` | |
where functionality overlaps. | |
Args: | |
x: The vector for which the B-Spline basis should be computed. | |
df: The number of degrees of freedom to use for this spline. If | |
specified, `knots` will be automatically generated such that they | |
are `df` - `degree` (minus one if `include_intercept` is True) | |
equally spaced quantiles. You cannot specify both `df` and `knots`. | |
knots: The internal breakpoints of the B-Spline. If not specified, they | |
default to the empty list (unless `df` is specified), in which case | |
the ordinary polynomial (Bezier) basis is generated. | |
degree: The degree of the B-Spline (the highest degree of terms in the | |
resulting polynomial). Must be a non-negative integer. | |
include_intercept: Whether to return a complete (full-rank) basis. Note | |
that if `ensure_full_rank=True` is passed to the materializer, then | |
the intercept will (depending on context) nevertheless be omitted. | |
lower_bound: The lower bound for the domain for the B-Spline basis. If | |
not specified this is determined from `x`. | |
upper_bound: The upper bound for the domain for the B-Spline basis. If | |
not specified this is determined from `x`. | |
extrapolation: Selects how extrapolation should be performed when values | |
in `x` extend beyond the lower and upper bounds. Valid values are: | |
- 'raise': Raises a `ValueError` if there are any values in `x` | |
outside the B-Spline domain. | |
- 'clip': Any values above/below the domain are set to the | |
upper/lower bounds. | |
- 'na': Any values outside of bounds are set to `numpy.nan`. | |
- 'zero': Any values outside of bounds are set to `0`. | |
- 'extend': Any values outside of bounds are computed by extending | |
the polynomials of the B-Spline (this is the same as the default | |
in R). | |
Returns: | |
A dictionary representing the encoded vectors ready for ingestion | |
by materializers (wrapped in a `FactorValues` instance providing | |
relevant metadata). | |
Notes: | |
The implementation employed here uses a slightly generalised version of | |
the ["Cox-de Boor" algorithm](https://en.wikipedia.org/wiki/B-spline#Definition), | |
extended by this author to allow for extrapolations (although this | |
author doubts this is terribly novel). We have not used the `splev` | |
methods from `scipy` since in benchmarks this implementation outperforms | |
them for our use-cases. | |
If you would like to learn more about B-Splines, the primer put together | |
by Jeffrey Racine is an excellent resource: | |
https://cran.r-project.org/web/packages/crs/vignettes/spline_primer.pdf | |
As a stateful transform, we only keep track of `knots`, `lower_bound` | |
and `upper_bound`, which are sufficient given that all other information | |
must be explicitly specified. | |
""" | |
# Prepare and check arguments | |
if df is not None and knots is not None: | |
raise ValueError("You cannot specify both `df` and `knots`.") | |
if "lower_bound" in _state: | |
lower_bound = _state["lower_bound"] | |
else: | |
lower_bound = _state["lower_bound"] = ( | |
numpy.min(x) if lower_bound is None else lower_bound | |
) | |
if "upper_bound" in _state: | |
upper_bound = _state["upper_bound"] | |
else: | |
upper_bound = _state["upper_bound"] = ( | |
numpy.max(x) if upper_bound is None else upper_bound | |
) | |
extrapolation = SplineExtrapolation(extrapolation) | |
# Prepare data | |
if extrapolation is SplineExtrapolation.RAISE and numpy.any( | |
(x < lower_bound) | (x > upper_bound) | |
): | |
raise ValueError( | |
"Some field values extend beyond upper and/or lower bounds, which can result in ill-conditioned bases. " | |
"Pass a value for `extrapolation` to control how extrapolation should be performed." | |
) | |
if extrapolation is SplineExtrapolation.CLIP: | |
x = numpy.clip(x, lower_bound, upper_bound) | |
if extrapolation is SplineExtrapolation.NA: | |
x = numpy.where((x >= lower_bound) & (x <= upper_bound), x, numpy.nan) | |
# Prepare knots | |
if "knots" not in _state: | |
knots = [] if knots is None else list(knots) | |
if df: | |
nknots = df - degree - (1 if include_intercept else 0) | |
if nknots < 0: | |
raise ValueError( | |
f"Invalid value for `df`. `df` must be greater than {degree + (1 if include_intercept else 0)} [`degree` (+ 1 if `include_intercept` is `True`)]." | |
) | |
knots = list( | |
numpy.quantile(x, numpy.linspace(0, 1, nknots + 2))[1:-1].ravel() | |
) | |
knots.insert(0, lower_bound) | |
knots.append(upper_bound) | |
knots = list(numpy.pad(knots, degree, mode="edge")) | |
_state["knots"] = knots | |
knots = _state["knots"] | |
# Compute basis splines | |
# The following code is equivalent to [B(i, j=degree) for in range(len(knots)-d-1)], with B(i, j) as defined below. | |
# B = lambda i, j: ((x >= knots[i]) & (x < knots[i+1])).astype(float) if j == 0 else alpha(i, j, x) * B(i, j-1, x) + (1 - alpha(i+1, j, x)) * B(i+1, j-1, x) | |
# We don't directly use this recurrence relation so that we can memoise the B(i, j). | |
cache = defaultdict(dict) | |
alpha = ( | |
lambda i, j: (x - knots[i]) / (knots[i + j] - knots[i]) | |
if knots[i + j] != knots[i] | |
else 0 | |
) | |
for i in range(len(knots) - 1): | |
if extrapolation is SplineExtrapolation.EXTEND: | |
cache[0][i] = ( | |
(x >= (knots[i] if i != degree else -numpy.inf)) | |
& ( | |
x | |
< (knots[i + 1] if i + 1 != len(knots) - degree - 1 else numpy.inf) | |
) | |
).astype(float) | |
else: | |
cache[0][i] = ( | |
(x >= knots[i]) | |
& ( | |
(x < knots[i + 1]) | |
if i + 1 != len(knots) - degree - 1 | |
else (x <= knots[i + 1]) # Properly handle boundary | |
) | |
).astype(float) | |
for d in range(1, degree + 1): | |
cache[d % 2].clear() | |
for i in range(len(knots) - d - 1): | |
cache[d % 2][i] = ( | |
alpha(i, d) * cache[(d - 1) % 2][i] | |
+ (1 - alpha(i + 1, d)) * cache[(d - 1) % 2][i + 1] | |
) | |
return FactorValues( | |
{ | |
i: cache[degree % 2][i] | |
for i in sorted(cache[degree % 2]) | |
if i > 0 or include_intercept | |
}, | |
kind="numerical", | |
spans_intercept=include_intercept, | |
drop_field=0, | |
format="{name}[{field}]", | |
encoded=False, | |
) | |
import numpy | |
from .basis_spline import basis_spline | |
from .identity import identity | |
from .contrasts import C, encode_contrasts, ContrastsRegistry | |
from .poly import poly | |
from .scale import center, scale | |
__all__ = [ | |
"basis_spline", | |
"identity", | |
"C", | |
"encode_contrasts", | |
"ContrastsRegistry", | |
"poly", | |
"center", | |
"scale", | |
"TRANSFORMS", | |
] | |
TRANSFORMS = { | |
# Common transforms | |
"np": numpy, | |
"log": numpy.log, | |
"log10": numpy.log10, | |
"log2": numpy.log2, | |
"exp": numpy.exp, | |
"exp10": lambda x: numpy.power(x, 10), | |
"exp2": numpy.exp2, | |
# Bespoke transforms | |
"bs": basis_spline, | |
"center": center, | |
"poly": poly, | |
"scale": scale, | |
"C": C, | |
"contr": ContrastsRegistry, | |
"I": identity, | |
} | |
from __future__ import annotations | |
from typing import TYPE_CHECKING | |
import numpy | |
from formulaic.materializers.types import FactorValues | |
from formulaic.utils.stateful_transforms import stateful_transform | |
try: | |
import numpy.typing | |
except ImportError as e: # pragma: no cover | |
if TYPE_CHECKING: | |
raise RuntimeError("Numpy >=1.20 is required for type-checking.") from e | |
@stateful_transform | |
def poly( | |
x: numpy.typing.ArrayLike, degree: int = 1, raw: bool = False, _state=None | |
) -> numpy.ndarray: | |
""" | |
Generate a basis for a polynomial vector-space representation of `x`. | |
The basis vectors returned by this transform can be used, for example, to | |
capture non-linear dependence on `x` in a linear regression. | |
Args: | |
x: The vector for which a polynomial vector space should be generated. | |
degree: The degree of the polynomial vector space. | |
raw: Whether to return "raw" basis vectors (e.g. `[x, x**2, x**3]`). If | |
`False`, an orthonormal set of basis vectors is returned instead | |
(see notes below for more information). | |
Returns: | |
A two-dimensional numpy array with `len(x)` rows, and `degree` columns. | |
The columns represent the basis vectors of the polynomial vector-space. | |
Notes: | |
This transform is an implementation of the "three-term recurrence | |
relation" for monic orthogonal polynomials. There are many good | |
introductions to these recurrence relations, including: | |
https://dec41.user.srcf.net/h/IB_L/numerical_analysis/2_3 | |
Another common approach is QR factorisation, where the columns of Q are | |
the orthogonal basis vectors. However, our implementation outperforms | |
numpy's QR decomposition, and does not require needless computation of | |
the R matrix. It should also be noted that orthogonal polynomial bases | |
are unique up to the choice of inner-product and scaling, and so all | |
methods will result in the same set of polynomials. | |
When used as a stateful transform, we retain the coefficients that | |
uniquely define the polynomials; and so new data will be evaluated | |
against the same polynomial bases as the original dataset. However, | |
the polynomial basis will almost certainly *not* be orthogonal for the | |
new data. This is because changing the incoming dataset is equivalent to | |
changing your choice of inner product. | |
Using orthogonal basis vectors (as compared to the "raw" vectors) allows | |
you to increase the degree of the polynomial vector space without | |
affecting the coefficients of lower-order components in a linear | |
regression. This stability is often attractive during exploratory data | |
analysis, but does not otherwise change the results of a linear | |
regression. | |
`nan` values in `x` will be ignored and progagated through to generated | |
polynomials. | |
The signature of this transform is intentionally chosen to be compatible | |
with R. | |
""" | |
if raw: | |
return numpy.stack([numpy.power(x, k) for k in range(1, degree + 1)], axis=1) | |
x = numpy.array(x) | |
# Check if we already have generated the alpha and beta coefficients. | |
# If not, we enter "training" mode. | |
training = False | |
alpha = _state.get("alpha") | |
norms2 = _state.get("norms2") | |
if alpha is None: | |
training = True | |
alpha = {} | |
norms2 = {} | |
# Build polynomials iteratively using the monic three-term recurrence relation | |
# Note that alpha and beta are fixed if not in "training" mode. | |
P = numpy.empty((x.shape[0], degree + 1)) | |
P[:, 0] = 1 | |
def get_alpha(k): | |
if training and k not in alpha: | |
alpha[k] = numpy.sum(x * P[:, k] ** 2) / numpy.sum(P[:, k] ** 2) | |
return alpha[k] | |
def get_norm(k): | |
if training and k not in norms2: | |
norms2[k] = numpy.sum(P[:, k] ** 2) | |
return norms2[k] | |
def get_beta(k): | |
return get_norm(k) / get_norm(k - 1) | |
for i in range(1, degree + 1): | |
P[:, i] = (x - get_alpha(i - 1)) * P[:, i - 1] | |
if i >= 2: | |
P[:, i] -= get_beta(i - 1) * P[:, i - 2] | |
# Renormalize so we provide an orthonormal basis. | |
P /= numpy.array([numpy.sqrt(get_norm(k)) for k in range(0, degree + 1)]) | |
if training: | |
_state["alpha"] = alpha | |
_state["norms2"] = norms2 | |
# Return basis dropping the first (constant) column | |
return FactorValues( | |
P[:, 1:], column_names=tuple(str(i) for i in range(1, degree + 1)) | |
) | |
def identity(data): | |
return data | |
import numpy | |
import scipy.sparse as spsparse | |
from formulaic.utils.stateful_transforms import stateful_transform | |
@stateful_transform | |
def scale(data, center=True, scale=True, ddof=1, _state=None): | |
data = numpy.array(data) | |
if "ddof" not in _state: | |
_state["ddof"] = ddof | |
else: | |
ddof = _state["ddof"] | |
# Handle centering | |
if "center" not in _state: | |
if isinstance(center, bool) and center: | |
_state["center"] = numpy.mean(data, axis=0) | |
elif not isinstance(center, bool): | |
_state["center"] = numpy.array(center) | |
else: | |
_state["center"] = None | |
if _state["center"] is not None: | |
data = data - _state["center"] | |
# Handle scaling | |
if "scale" not in _state: | |
if isinstance(scale, bool) and scale: | |
_state["scale"] = numpy.sqrt( | |
numpy.sum(data**2, axis=0) / (data.shape[0] - ddof) | |
) | |
elif not isinstance(scale, bool): | |
_state["scale"] = numpy.array(scale) | |
else: | |
_state["scale"] = None | |
if _state["scale"] is not None: | |
data = data / _state["scale"] | |
return data | |
@scale.register(spsparse.spmatrix) | |
def _(data, *args, **kwargs): | |
assert data.shape[1] == 1 | |
return scale(data.toarray()[:, 0], *args, **kwargs) | |
@stateful_transform | |
def center(data, _state=None): | |
return scale(data, scale=False, _state=_state) | |
# Top-level error and warning classes | |
class FormulaicError(Exception): | |
pass | |
class FormulaicWarning(Warning): | |
pass | |
# Formula parsing errors | |
class FormulaInvalidError(FormulaicError): | |
""" | |
Provided formula specification is not a valid format. | |
""" | |
class FormulaParsingError(FormulaicError): | |
""" | |
An error occured during the parsing of a formula specification. | |
""" | |
class FormulaSyntaxError(FormulaParsingError): | |
""" | |
Could not tokenize the nominated formula specification. | |
""" | |
# Formula materializer meta-errors | |
class FormulaMaterializerInvalidError(FormulaicError): | |
pass | |
class FormulaMaterializerNotFoundError(FormulaicError): | |
pass | |
# Data materialization errors and warnings | |
class FormulaMaterializationError(FormulaicError): | |
pass | |
class FactorEncodingError(FormulaMaterializationError): | |
pass | |
class FactorEvaluationError(FormulaMaterializationError): | |
pass | |
class DataMismatchWarning(FormulaicWarning): | |
pass | |
from enum import Enum | |
class NAAction(Enum): | |
DROP = "drop" | |
RAISE = "raise" | |
IGNORE = "ignore" | |
from .scoped_factor import ScopedFactor | |
class ScopedTerm: | |
__slots__ = ("factors", "scale") | |
def __init__(self, factors, scale=None): | |
self.factors = tuple(sorted(factors)) | |
self.scale = scale | |
def __hash__(self): | |
return hash(self.factors) | |
def __eq__(self, other): | |
if isinstance(other, ScopedTerm): | |
return self.factors == other.factors | |
return NotImplemented | |
def __repr__(self): | |
factor_repr = ( | |
":".join(f.__repr__() for f in sorted(self.factors)) | |
if self.factors | |
else "1" | |
) | |
if self.scale is not None and self.scale != 1: | |
return f"{self.scale}*{factor_repr}" | |
return factor_repr | |
def copy(self, *, without_values=False): | |
factors = self.factors | |
if without_values: | |
factors = [ | |
ScopedFactor( | |
factor=factor.factor.replace(values=None), | |
reduced=factor.reduced, | |
) | |
for factor in factors | |
] | |
return ScopedTerm(factors, scale=self.scale) | |
from __future__ import annotations | |
from dataclasses import dataclass, replace | |
from typing import Any, Optional | |
from formulaic.parser.types import Factor | |
from .factor_values import FactorValues, FactorValuesMetadata | |
@dataclass | |
class EvaluatedFactor: | |
""" | |
A container for the evaluated state of a `Factor` object in a given context. | |
This class acts as the glue between an abstract `Factor` specification and | |
the realisation of that factor in a specific data context. | |
Attributes: | |
factor: The `Factor` instance for which values have been computed. | |
values: The evaluated values for the factor. | |
""" | |
factor: Optional[Factor] = None | |
values: Optional[FactorValues[Any]] = None | |
@property | |
def expr(self) -> str: | |
""" | |
The expression of the evaluated factor. | |
""" | |
return self.factor.expr | |
@property | |
def metadata(self) -> FactorValuesMetadata: | |
""" | |
The metadata associated with the evaluated values. | |
""" | |
return self.values.__formulaic_metadata__ | |
def __repr__(self) -> str: | |
return repr(self.factor) | |
def __eq__(self, other) -> bool: | |
if isinstance(other, EvaluatedFactor): | |
return self.factor == other.factor | |
return NotImplemented | |
def __lt__(self, other) -> bool: | |
if isinstance(other, EvaluatedFactor): | |
return self.factor < other.factor | |
return NotImplemented | |
def replace(self, **changes) -> EvaluatedFactor: | |
""" | |
Return a copy of this `EvaluatedFactor` instance with the nominated | |
attributes mutated. | |
""" | |
return replace(self, **changes) | |
class ScopedFactor: | |
def __init__(self, factor, reduced=False): | |
self.factor = factor | |
self.reduced = reduced | |
def __repr__(self): | |
return repr(self.factor) + ("-" if self.reduced else "") | |
def __hash__(self): | |
return hash(repr(self)) | |
def __eq__(self, other): | |
if isinstance(other, ScopedFactor): | |
return self.factor == other.factor and self.reduced == other.reduced | |
return NotImplemented | |
def __lt__(self, other): | |
if isinstance(other, ScopedFactor): | |
if self.factor == other.factor: | |
return self.reduced > other.reduced | |
return self.factor < other.factor | |
return NotImplemented | |
from .enums import NAAction | |
from .evaluated_factor import EvaluatedFactor | |
from .factor_values import FactorValues | |
from .scoped_factor import ScopedFactor | |
from .scoped_term import ScopedTerm | |
__all__ = [ | |
"EvaluatedFactor", | |
"FactorValues", | |
"NAAction", | |
"ScopedFactor", | |
"ScopedTerm", | |
] | |
from __future__ import annotations | |
import copy | |
from dataclasses import dataclass, replace | |
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar, Union | |
import wrapt | |
from formulaic.parser.types import Factor | |
from formulaic.utils.sentinels import MISSING | |
T = TypeVar("T") | |
@dataclass | |
class FactorValuesMetadata: | |
""" | |
Metadata about evaluated factor values. | |
This metadata is used to inform materializers about how to treat these | |
values. | |
Attributes: | |
kind: The kind of the evaluated values. | |
spans_intercept: Whether the values span the intercept or not. | |
drop_field: If the values do span the intercept, and we want to reduce | |
the rank, which field should be dropped. | |
format: The format to use when exploding factors into multiple columns | |
(e.g. when encoding categories via dummy-encoding). | |
encoded: Whether the values should be treated as pre-encoded. | |
encoder: An optional callable with signature | |
`(values: Any, reduced_rank: bool, drop_rows: List[int], encoder_state: Dict[str, Any], spec: ModelSpec)` | |
that outputs properly encoded values suitable for the current | |
materializer. Note that this should only be used in cases where | |
direct evaluation would yield different results in reduced vs. | |
non-reduced rank scenarios. | |
""" | |
kind: Factor.Kind = Factor.Kind.UNKNOWN | |
column_names: Optional[Tuple[str]] = None | |
spans_intercept: bool = False | |
drop_field: Optional[str] = None | |
format: str = "{name}[{field}]" | |
encoded: bool = False | |
encoder: Optional[Callable[[Any, bool, List[int], Dict[str, Any]], Any]] = None | |
def replace(self, **kwargs) -> FactorValuesMetadata: | |
""" | |
Return a copy of this `FactorValuesMetadata` instance with the nominated | |
attributes replaced. | |
""" | |
if not kwargs: | |
return self | |
return replace(self, **kwargs) | |
class FactorValues(Generic[T], wrapt.ObjectProxy): | |
""" | |
A convenience wrapper that surfaces a `FactorValuesMetadata` instance at | |
`<object>.__formulaic_metadata__`. This wrapper can otherwise wrap any | |
object and behaves just like that object. | |
""" | |
def __init__( | |
self, | |
values: Any, | |
*, | |
metadata: FactorValuesMetadata = MISSING, | |
kind: Union[str, Factor.Kind] = MISSING, | |
column_names: Tuple[str] = MISSING, | |
spans_intercept: bool = MISSING, | |
drop_field: Optional[str] = MISSING, | |
format: str = MISSING, # pylint: disable=redefined-builtin | |
encoded: bool = MISSING, | |
encoder: Optional[ | |
Callable[[Any, bool, List[int], Dict[str, Any]], Any] | |
] = MISSING, | |
): | |
metadata_constructor = FactorValuesMetadata | |
metadata_kwargs = dict( | |
kind=Factor.Kind(kind) if kind is not MISSING else kind, | |
column_names=column_names, | |
spans_intercept=spans_intercept, | |
drop_field=drop_field, | |
format=format, | |
encoded=encoded, | |
encoder=encoder, | |
) | |
for key in set(metadata_kwargs): | |
if metadata_kwargs[key] is MISSING: | |
metadata_kwargs.pop(key) | |
if hasattr(values, "__formulaic_metadata__"): | |
metadata_constructor = values.__formulaic_metadata__.replace | |
if isinstance(values, FactorValues): | |
values = values.__wrapped__ | |
if metadata: | |
metadata_constructor = metadata.replace | |
wrapt.ObjectProxy.__init__(self, values) | |
self._self_metadata = metadata_constructor(**metadata_kwargs) | |
@property | |
def __formulaic_metadata__(self) -> FactorValuesMetadata: | |
return self._self_metadata | |
def __repr__(self) -> str: | |
return self.__wrapped__.__repr__() # pragma: no cover | |
# Handle copying behaviour | |
def __copy__(self): | |
return type(self)(copy.copy(self.__wrapped__), metadata=self._self_metadata) | |
def __deepcopy__(self, memo=None): | |
return type(self)( | |
copy.deepcopy(self.__wrapped__, memo), | |
metadata=copy.deepcopy(self._self_metadata), | |
) | |
import functools | |
import itertools | |
from collections import OrderedDict | |
import numpy | |
import pandas | |
import scipy.sparse as spsparse | |
from interface_meta import override | |
from formulaic.utils.cast import as_columns | |
from .base import FormulaMaterializer | |
from .types import NAAction | |
class PandasMaterializer(FormulaMaterializer): | |
REGISTER_NAME = "pandas" | |
REGISTER_INPUTS = ("pandas.core.frame.DataFrame",) | |
REGISTER_OUTPUTS = ("pandas", "numpy", "sparse") | |
@override | |
def _is_categorical(self, values): | |
if isinstance(values, (pandas.Series, pandas.Categorical)): | |
return values.dtype == object or isinstance( | |
values.dtype, pandas.CategoricalDtype | |
) | |
return super()._is_categorical(values) | |
@override | |
def _check_for_nulls(self, name, values, na_action, drop_rows): | |
if na_action is NAAction.IGNORE: | |
return | |
if isinstance( | |
values, dict | |
): # pragma: no cover; no formulaic transforms return dictionaries any more | |
for key, vs in values.items(): | |
self._check_for_nulls(f"{name}[{key}]", vs, na_action, drop_rows) | |
elif na_action is NAAction.RAISE: | |
if isinstance(values, pandas.Series) and values.isnull().values.any(): | |
raise ValueError(f"`{name}` contains null values after evaluation.") | |
elif na_action is NAAction.DROP: | |
if isinstance(values, pandas.Series): | |
drop_rows.update(numpy.flatnonzero(values.isnull().values)) | |
else: | |
raise ValueError( | |
f"Do not know how to interpret `na_action` = {repr(na_action)}." | |
) # pragma: no cover; this is currently impossible to reach | |
@override | |
def _encode_constant(self, value, metadata, encoder_state, spec, drop_rows): | |
if spec.output == "sparse": | |
return spsparse.csc_matrix( | |
numpy.array([value] * self.nrows).reshape( | |
(self.nrows - len(drop_rows), 1) | |
) | |
) | |
series = value * numpy.ones(self.nrows - len(drop_rows)) | |
return series | |
@override | |
def _encode_numerical(self, values, metadata, encoder_state, spec, drop_rows): | |
if drop_rows: | |
values = values.drop(index=values.index[drop_rows]) | |
if spec.output == "sparse": | |
return spsparse.csc_matrix(numpy.array(values).reshape((self.nrows, 1))) | |
return values | |
@override | |
def _encode_categorical( | |
self, values, metadata, encoder_state, spec, drop_rows, reduced_rank=False | |
): | |
# Even though we could reduce rank here, we do not, so that the same | |
# encoding can be cached for both reduced and unreduced rank. The | |
# rank will be reduced in the _encode_evaled_factor method. | |
from formulaic.transforms import encode_contrasts | |
if drop_rows: | |
values = values.drop(index=values.index[drop_rows]) | |
return as_columns( | |
encode_contrasts( | |
values, | |
reduced_rank=False, | |
_metadata=metadata, | |
_state=encoder_state, | |
_spec=spec, | |
) | |
) | |
@override | |
def _get_columns_for_term(self, factors, spec, scale=1): | |
out = OrderedDict() | |
# Pre-multiply factors with only one set of values (improves performance) | |
solo_factors = {} | |
indices = [] | |
for i, factor in enumerate(factors): | |
if len(factor) == 1: | |
solo_factors.update(factor) | |
indices.append(i) | |
if solo_factors: | |
for index in reversed(indices): | |
factors.pop(index) | |
if spec.output == "sparse": | |
factors.append( | |
{ | |
":".join(solo_factors): functools.reduce( | |
spsparse.csc_matrix.multiply, solo_factors.values() | |
) | |
} | |
) | |
else: | |
factors.append( | |
{ | |
":".join(solo_factors): functools.reduce( | |
numpy.multiply, | |
(numpy.asanyarray(p) for p in solo_factors.values()), | |
) | |
} | |
) | |
for product in itertools.product(*(factor.items() for factor in factors)): | |
if spec.output == "sparse": | |
out[":".join(p[0] for p in product)] = scale * functools.reduce( | |
spsparse.csc_matrix.multiply, (p[1] for p in product) | |
) | |
else: | |
out[":".join(p[0] for p in product)] = scale * functools.reduce( | |
numpy.multiply, | |
(numpy.array(p[1]) for p in product), | |
) | |
return out | |
@override | |
def _combine_columns(self, cols, spec, drop_rows): | |
# If we are outputing a pandas DataFrame, explicitly override index | |
# in case transforms/etc have lost track of it. | |
if spec.output == "pandas": | |
pandas_index = self.data_context.index | |
if drop_rows: | |
pandas_index = pandas_index.drop(self.data_context.index[drop_rows]) | |
# Special case no columns to empty csc_matrix, array, or DataFrame | |
if not cols: | |
values = numpy.empty((self.data.shape[0], 0)) | |
if spec.output == "sparse": | |
return spsparse.csc_matrix(values) | |
if spec.output == "numpy": | |
return values | |
return pandas.DataFrame(index=pandas_index) | |
# Otherwise, concatenate columns into model matrix | |
if spec.output == "sparse": | |
return spsparse.hstack([col[1] for col in cols]) | |
if spec.output == "numpy": | |
return numpy.stack([col[1] for col in cols], axis=1) | |
return pandas.DataFrame( | |
{col[0]: col[1] for col in cols}, | |
index=pandas_index, | |
copy=False, | |
) | |
from interface_meta import override | |
import pandas | |
from .pandas import PandasMaterializer | |
class ArrowMaterializer(PandasMaterializer): | |
REGISTER_NAME = "arrow" | |
REGISTER_INPUTS = ("pyarrow.lib.Table",) | |
@override | |
def _init(self): | |
self.__data_context = LazyArrowTableProxy(self.data) | |
@override | |
@property | |
def data_context(self): | |
return self.__data_context | |
class LazyArrowTableProxy: | |
def __init__(self, table): | |
self.table = table | |
self.column_names = set(self.table.column_names) | |
self._cache = {} | |
self.index = pandas.RangeIndex(len(table)) | |
def __contains__(self, value): | |
return value in self.column_names | |
def __getitem__(self, key): | |
if key not in self.column_names: | |
raise KeyError(key) | |
if key not in self._cache: | |
self._cache[key] = self.table.column(key).to_pandas() | |
return self._cache[key] | |
from .arrow import ArrowMaterializer | |
from .base import FormulaMaterializer | |
from .pandas import PandasMaterializer | |
from .types import FactorValues, NAAction | |
__all__ = [ | |
"ArrowMaterializer", | |
"FormulaMaterializer", | |
"PandasMaterializer", | |
# Useful types | |
"NAAction", | |
"FactorValues", | |
] | |
from __future__ import annotations | |
import functools | |
import inspect | |
import itertools | |
import operator | |
from abc import abstractmethod | |
from collections import defaultdict, OrderedDict, namedtuple | |
from typing import ( | |
Any, | |
Dict, | |
Generator, | |
List, | |
Iterable, | |
Set, | |
Tuple, | |
Union, | |
TYPE_CHECKING, | |
) | |
from interface_meta import InterfaceMeta, inherit_docs | |
from formulaic.errors import ( | |
FactorEncodingError, | |
FactorEvaluationError, | |
FormulaMaterializationError, | |
FormulaMaterializerInvalidError, | |
FormulaMaterializerNotFoundError, | |
) | |
from formulaic.materializers.types.factor_values import FactorValuesMetadata | |
from formulaic.model_matrix import ModelMatrices, ModelMatrix | |
from formulaic.parser.types import Factor, Term | |
from formulaic.transforms import TRANSFORMS | |
from formulaic.utils.cast import as_columns | |
from formulaic.utils.layered_mapping import LayeredMapping | |
from formulaic.utils.stateful_transforms import stateful_eval | |
from .types import EvaluatedFactor, FactorValues, ScopedFactor, ScopedTerm | |
if TYPE_CHECKING: # pragma: no cover | |
from formulaic import FormulaSpec, ModelSpec, ModelSpecs | |
EncodedTermStructure = namedtuple( | |
"EncodedTermStructure", ("term", "scoped_terms", "columns") | |
) | |
class FormulaMaterializerMeta(InterfaceMeta): | |
INTERFACE_RAISE_ON_VIOLATION = True | |
REGISTERED_NAMES = {} | |
REGISTERED_INPUTS = defaultdict(list) | |
def __register_implementation__(cls): | |
if "REGISTER_NAME" in cls.__dict__ and cls.REGISTER_NAME: | |
cls.REGISTERED_NAMES[cls.REGISTER_NAME] = cls | |
if "REGISTER_INPUTS" in cls.__dict__: | |
for input_type in cls.REGISTER_INPUTS: | |
cls.REGISTERED_INPUTS[input_type] = sorted( | |
cls.REGISTERED_INPUTS[input_type] + [cls], | |
key=lambda x: x.REGISTER_PRECEDENCE, | |
reverse=True, | |
) | |
def for_materializer(cls, materializer): | |
if isinstance(materializer, str): | |
if materializer not in cls.REGISTERED_NAMES: | |
raise FormulaMaterializerNotFoundError(materializer) | |
return cls.REGISTERED_NAMES[materializer] | |
if isinstance(materializer, FormulaMaterializer): | |
return type(materializer) | |
if not inspect.isclass(materializer) or not issubclass( | |
materializer, FormulaMaterializer | |
): | |
raise FormulaMaterializerInvalidError( | |
"Materializers must be subclasses of `formulaic.materializers.FormulaMaterializer`." | |
) | |
return materializer | |
def for_data(cls, data, output=None): | |
datacls = data.__class__ | |
input_type = f"{datacls.__module__}.{datacls.__qualname__}" | |
if input_type not in cls.REGISTERED_INPUTS: | |
raise FormulaMaterializerNotFoundError( | |
f"No materializer has been registered for input type {repr(input_type)}. Available input types are: {set(cls.REGISTER_INPUTS)}." | |
) | |
if output is None: | |
return cls.REGISTERED_INPUTS[input_type][0] | |
for materializer in cls.REGISTERED_INPUTS[input_type]: | |
if output in materializer.REGISTER_OUTPUTS: | |
return materializer | |
output_types = set( | |
*itertools.chain( | |
materializer.REGISTER_OUTPUTS | |
for materializer in cls.REGISTERED_INPUTS[input_type] | |
) | |
) | |
raise FormulaMaterializerNotFoundError( | |
f"No materializer has been registered for input type {repr(input_type)} that supports output type {repr(output)}. Available output types for {repr(input_type)} are: {output_types}." | |
) | |
class FormulaMaterializer(metaclass=FormulaMaterializerMeta): | |
REGISTER_NAME = None | |
REGISTER_INPUTS = set() | |
REGISTER_OUTPUTS = set() | |
REGISTER_PRECEDENCE = 100 | |
# Public API | |
@inherit_docs(method="_init") | |
def __init__(self, data, context=None, **params): | |
self.data = data | |
self.context = context or {} | |
self.params = params | |
self._init() | |
self.layered_context = LayeredMapping( | |
self.data_context, self.context, TRANSFORMS | |
) | |
self.factor_cache = {} | |
self.encoded_cache = {} | |
def _init(self): | |
pass # pragma: no cover | |
@property | |
def data_context(self): | |
return self.data | |
@property | |
def nrows(self): | |
return len(self.data) | |
def get_model_matrix( | |
self, | |
spec: Union[FormulaSpec, ModelMatrix, ModelMatrices, ModelSpec, ModelSpecs], | |
**spec_overrides, | |
): | |
from formulaic import ModelSpec | |
# Prepare ModelSpec(s) | |
spec: Union[ModelSpec, ModelSpecs] = ModelSpec.from_spec(spec, **spec_overrides) | |
should_simplify = isinstance(spec, ModelSpec) | |
model_specs: ModelSpecs = self._prepare_model_specs(spec) | |
# Step 0: Pool all factors and transform state, ensuring consistency | |
# during factor evaluation (esp. which rows get dropped). | |
( | |
factors, | |
factor_evaluation_model_spec, | |
) = self._prepare_factor_evaluation_model_spec(model_specs) | |
# Step 1: Evaluate all factors and cache the results, keeping track of | |
# which rows need dropping (if `self.config.na_action == 'drop'`). | |
drop_rows = set() | |
for factor in factors: | |
self._evaluate_factor(factor, factor_evaluation_model_spec, drop_rows) | |
drop_rows = sorted(drop_rows) | |
# Step 2: Update the structured model specs with the information from | |
# the shared transform state pool. | |
model_specs._map( | |
lambda ms: ms.transform_state.update( | |
{ | |
factor.expr: factor_evaluation_model_spec.transform_state[ | |
factor.expr | |
] | |
for term in ms.formula | |
for factor in term.factors | |
if factor.expr in factor_evaluation_model_spec.transform_state | |
} | |
) | |
) | |
# Step 3: Build the model matrices using the shared factor cache, and | |
# by recursing over the structured model matrices. | |
model_matrices = model_specs._map( | |
lambda model_spec: self._build_model_matrix( | |
model_spec, drop_rows=drop_rows | |
), | |
as_type=ModelMatrices, | |
) | |
if should_simplify: | |
return model_matrices._simplify() | |
return model_matrices | |
def _build_model_matrix(self, spec: ModelSpec, drop_rows): | |
# Step 1: Determine strategy to maintain structural full-rankness of output matrix | |
scoped_terms_for_terms = self._get_scoped_terms( | |
spec.formula, ensure_full_rank=spec.ensure_full_rank | |
) | |
# Step 2: Generate the columns which will be collated into the full matrix | |
cols = [] | |
for term, scoped_terms in scoped_terms_for_terms: | |
scoped_cols = OrderedDict() | |
for scoped_term in scoped_terms: | |
if not scoped_term.factors: | |
scoped_cols[ | |
"Intercept" | |
] = scoped_term.scale * self._encode_constant( | |
1, None, {}, spec, drop_rows | |
) | |
else: | |
scoped_cols.update( | |
self._get_columns_for_term( | |
[ | |
self._encode_evaled_factor( | |
scoped_factor.factor, | |
spec, | |
drop_rows, | |
reduced_rank=scoped_factor.reduced, | |
) | |
for scoped_factor in sorted(scoped_term.factors) | |
], | |
spec=spec, | |
scale=scoped_term.scale, | |
) | |
) | |
cols.append((term, scoped_terms, scoped_cols)) | |
# Step 3: Populate remaining model spec fields | |
if spec.structure: | |
cols = self._enforce_structure(cols, spec, drop_rows) | |
else: | |
spec = spec.update( | |
structure=[ | |
EncodedTermStructure( | |
term, | |
list(st.copy(without_values=True) for st in scoped_terms), | |
list(scoped_cols), | |
) | |
for term, scoped_terms, scoped_cols in cols | |
], | |
) | |
# Step 4: Collate factors into one ModelMatrix | |
return ModelMatrix( | |
self._combine_columns( | |
[ | |
(name, values) | |
for term, scoped_terms, scoped_cols in cols | |
for name, values in scoped_cols.items() | |
], | |
spec=spec, | |
drop_rows=drop_rows, | |
), | |
spec=spec, | |
) | |
# Methods related to input preparation | |
def _prepare_model_specs(self, spec: Union[ModelSpec, ModelSpecs]) -> ModelSpecs: | |
from formulaic.model_spec import ModelSpecs | |
if not isinstance(spec, ModelSpecs): | |
spec = ModelSpecs(spec) | |
def prepare_model_spec(model_spec: ModelSpec): | |
overrides = { | |
"materializer": self.REGISTER_NAME, | |
"materializer_params": self.params, | |
} | |
if model_spec.output is None: | |
overrides["output"] = self.REGISTER_OUTPUTS[0] | |
elif model_spec.output not in self.REGISTER_OUTPUTS: | |
raise FormulaMaterializationError( | |
f"Nominated output {repr(model_spec.output)} is invalid. Available output types are: {set(self.REGISTER_OUTPUTS)}." | |
) | |
return model_spec.update(**overrides) | |
return spec._map(prepare_model_spec, as_type=ModelSpecs) | |
def _prepare_factor_evaluation_model_spec(self, model_specs: ModelSpecs): | |
from formulaic.model_spec import ModelSpec | |
output = set() | |
na_action = set() | |
ensure_full_rank = set() | |
factors = set() | |
transform_state = {} | |
def update_pooled_spec(model_spec: ModelSpec): | |
output.add(model_spec.output) | |
na_action.add(model_spec.na_action) | |
ensure_full_rank.add(model_spec.ensure_full_rank) | |
factors.update( | |
itertools.chain(*(term.factors for term in model_spec.formula)) | |
) | |
transform_state.update( | |
model_spec.transform_state | |
) # TODO: Check for consistency? | |
model_specs._map(update_pooled_spec) | |
if len(output) != 1 or len(na_action) != 1 or len(ensure_full_rank) != 1: | |
raise RuntimeError( | |
"Provided `ModelSpec` instances are not consistent." | |
) # pragma: no cover; will only occur if users manually construct a structured model spec. | |
return factors, ModelSpec( | |
formula=[], | |
ensure_full_rank=next(iter(ensure_full_rank)), | |
na_action=next(iter(na_action)), | |
output=next(iter(output)), | |
transform_state=transform_state, | |
) | |
# Methods related to ensuring out matrices are structurally full-rank | |
def _get_scoped_terms(self, terms, ensure_full_rank=True): | |
""" | |
Generate the terms to be used in the model matrix. | |
This method first evaluates each factor in the context of the data | |
(and environment), and then determines the correct "scope" (full vs. | |
reduced rank) for each term. If `ensure_full_rank` is `True`, then the | |
resulting terms when combined is guaranteed to be structurally full-rank. | |
Args: | |
terms (list<Term>): A list of term arguments (usually from a formula) | |
object. | |
ensure_full_rank (bool): Whether evaluated terms should be scoped | |
to ensure that their combination will result in a full-rank | |
matrix. | |
transform_state (dict): The state of any stateful transforms | |
(will be populated if empty). | |
Returns: | |
list<ScopedTerm>: A list of appropriately scoped terms. | |
""" | |
spanned = set() | |
for term in terms: | |
evaled_factors = [self.factor_cache[factor.expr] for factor in term.factors] | |
if ensure_full_rank: | |
term_span = self._get_scoped_terms_spanned_by_evaled_factors( | |
evaled_factors | |
).difference(spanned) | |
scoped_terms = self._simplify_scoped_terms(term_span) | |
spanned.update(term_span) | |
else: | |
scoped_terms = [ | |
ScopedTerm( | |
factors=( | |
ScopedFactor(evaled_factor, reduced=False) | |
for evaled_factor in evaled_factors | |
if evaled_factor.metadata.kind is not Factor.Kind.CONSTANT | |
), | |
scale=functools.reduce( | |
operator.mul, | |
[ | |
evaled_factor.values | |
for evaled_factor in evaled_factors | |
if evaled_factor.metadata.kind.value | |
is Factor.Kind.CONSTANT | |
], | |
1, | |
), | |
) | |
] | |
yield term, scoped_terms | |
@classmethod | |
def _get_scoped_terms_spanned_by_evaled_factors( | |
cls, evaled_factors: Iterable[EvaluatedFactor] | |
) -> Set[ScopedTerm]: | |
""" | |
Return the set of ScopedTerm instances which span the set of | |
evaluated factors. | |
Args: | |
evaled_factors: The evaluated factors for which to generated scoped | |
terms. | |
Returns: | |
The scoped terms for the nominated `evaled_factors`. | |
""" | |
scale = 1 | |
factors = [] | |
for factor in evaled_factors: | |
if factor.metadata.kind is Factor.Kind.CONSTANT: | |
scale *= factor.values | |
elif factor.metadata.spans_intercept: | |
factors.append((1, ScopedFactor(factor, reduced=True))) | |
else: | |
factors.append((ScopedFactor(factor),)) | |
return set( | |
ScopedTerm(factors=(p for p in prod if p != 1), scale=scale) | |
for prod in itertools.product(*factors) | |
) | |
@classmethod | |
def _simplify_scoped_terms(cls, scoped_terms): | |
""" | |
Return the minimal set of ScopedTerm instances that spans the same vectorspace. | |
""" | |
terms = [] | |
for scoped_term in sorted(scoped_terms, key=lambda x: len(x.factors)): | |
factors = set(scoped_term.factors) | |
combined = False | |
for co_scoped_term in terms: | |
cofactors = set(co_scoped_term.factors) | |
factors_diff = factors.difference(cofactors) | |
if len(factors) - 1 != len(cofactors) or len(factors_diff) != 1: | |
continue | |
factor_new = next(iter(factors_diff)) | |
if factor_new.reduced: | |
co_scoped_term.factors += ( | |
ScopedFactor(factor_new.factor, reduced=False), | |
) | |
terms = cls._simplify_scoped_terms(terms) | |
combined = True | |
break | |
if not combined: | |
terms.append(scoped_term.copy()) | |
return terms | |
# Methods related to looking-up, evaluating and encoding terms and factors | |
def _evaluate_factor( | |
self, factor: Factor, spec: ModelSpec, drop_rows: set | |
) -> EvaluatedFactor: | |
if factor.expr not in self.factor_cache: | |
try: | |
if factor.eval_method.value == "lookup": | |
value = self._lookup(factor.expr) | |
elif factor.eval_method.value == "python": | |
value = self._evaluate(factor.expr, factor.metadata, spec) | |
elif factor.eval_method.value == "literal": | |
value = FactorValues( | |
self._evaluate(factor.expr, factor.metadata, spec), | |
kind=Factor.Kind.CONSTANT, | |
) | |
else: | |
raise FactorEvaluationError( | |
f"The evaluation method `{factor.eval_method.value}` for factor `{factor}` is not understood." | |
) | |
except FactorEvaluationError: | |
raise | |
except Exception as e: | |
raise FactorEvaluationError( | |
f"Unable to evaluate factor `{factor}`. [{type(e).__name__}: {e}]" | |
) from e | |
if not isinstance(value, FactorValues): | |
value = FactorValues(value) | |
if value.__formulaic_metadata__.kind is Factor.Kind.UNKNOWN: | |
if self._is_categorical(value): | |
kind = Factor.Kind.CATEGORICAL | |
spans_intercept = True | |
else: | |
kind = Factor.Kind.NUMERICAL | |
spans_intercept = False | |
value = FactorValues(value, kind=kind, spans_intercept=spans_intercept) | |
if ( | |
factor.kind is not Factor.Kind.UNKNOWN | |
and factor.kind is not value.__formulaic_metadata__.kind | |
): | |
if factor.kind is Factor.Kind.CATEGORICAL: | |
value.__formulaic_metadata__.kind = factor.kind | |
else: | |
raise FactorEncodingError( | |
f"Factor `{factor}` is expecting values of kind '{factor.kind.value}', " | |
f"but they are actually of kind '{value.__formulaic_metadata__.kind.value}'." | |
) | |
if ( | |
factor.expr in spec.encoder_state | |
and value.__formulaic_metadata__.kind | |
is not spec.encoder_state[factor.expr][0] | |
): | |
raise FactorEncodingError( | |
f"The model specification expects factor `{factor}` to have values of kind " | |
f"`{spec.encoder_state[factor.expr][0]}`, but they are actually of kind " | |
f"`{value.__formulaic_metadata__.kind.value}`." | |
) | |
self._check_for_nulls(factor.expr, value, spec.na_action, drop_rows) | |
self.factor_cache[factor.expr] = EvaluatedFactor( | |
factor=factor, values=value | |
) | |
return self.factor_cache[factor.expr] | |
def _lookup(self, name): | |
return self.layered_context[name] | |
def _evaluate(self, expr, metadata, spec): | |
return stateful_eval( | |
expr, self.layered_context, {expr: metadata}, spec.transform_state, spec | |
) | |
def _is_categorical(self, values): | |
if hasattr(values, "__formulaic_metadata__"): | |
return values.__formulaic_metadata__.kind is Factor.Kind.CATEGORICAL | |
return False | |
def _check_for_nulls(self, name, values, na_action, drop_rows): | |
pass # pragma: no cover | |
def _encode_evaled_factor( | |
self, | |
factor: EvaluatedFactor, | |
spec: ModelSpec, | |
drop_rows: set, | |
reduced_rank: bool = False, | |
) -> Dict[str, Any]: | |
if not factor.metadata.encoded: | |
if factor.expr in self.encoded_cache: | |
encoded = self.encoded_cache[factor.expr] | |
elif (factor.expr, reduced_rank) in self.encoded_cache: | |
encoded = self.encoded_cache[(factor.expr, reduced_rank)] | |
else: | |
def map_dict(f): | |
""" | |
This decorator allows an encoding function to operator on | |
dictionaries (which should be mapped over). This allows | |
transforms to output multiple non-encoded columns and still | |
have everything work as expected. | |
""" | |
@functools.wraps(f) | |
def wrapped(values, metadata, state, *args, **kwargs): | |
if isinstance(values, dict): | |
encoded = {} | |
for k, v in values.items(): | |
if isinstance(k, str) and k.startswith("__"): | |
encoded[k] = v | |
else: | |
nested_state = state.get(k, {}) | |
encoded[k] = wrapped( | |
v, metadata, nested_state, *args, **kwargs | |
) | |
if nested_state: | |
state[k] = nested_state | |
if isinstance(values, FactorValues): | |
return FactorValues( | |
encoded, metadata=values.__formulaic_metadata__ | |
) | |
return encoded # pragma: no cover; nothing in formulaic uses this, but is here for generality. | |
return f(values, metadata, state, *args, **kwargs) | |
return wrapped | |
encoder_state = spec.encoder_state.get(factor.expr, [None, {}])[1] | |
if factor.metadata.encoder is not None: | |
encoded = as_columns( | |
factor.metadata.encoder( | |
factor.values, | |
reduced_rank=reduced_rank, | |
drop_rows=drop_rows, | |
encoder_state=encoder_state, | |
model_spec=spec, | |
) | |
) | |
else: | |
# If we need to unpack values into columns, we do this here. | |
# Otherwise, we pass through the original values. | |
factor_values = FactorValues( | |
self._extract_columns_for_encoding(factor), | |
metadata=factor.metadata, | |
) | |
if factor.metadata.kind is Factor.Kind.CATEGORICAL: | |
encoded = map_dict(self._encode_categorical)( | |
factor_values, | |
factor.metadata, | |
encoder_state, | |
spec, | |
drop_rows, | |
reduced_rank=reduced_rank, | |
) | |
elif factor.metadata.kind is Factor.Kind.NUMERICAL: | |
encoded = map_dict(self._encode_numerical)( | |
factor_values, | |
factor.metadata, | |
encoder_state, | |
spec, | |
drop_rows, | |
) | |
elif factor.metadata.kind is Factor.Kind.CONSTANT: | |
encoded = map_dict(self._encode_constant)( | |
factor_values, | |
factor.metadata, | |
encoder_state, | |
spec, | |
drop_rows, | |
) | |
else: | |
raise FactorEncodingError( | |
factor | |
) # pragma: no cover; it is not currently possible to reach this sentinel | |
spec.encoder_state[factor.expr] = (factor.metadata.kind, encoder_state) | |
# Only encode once for encodings where we can just drop a field | |
# later on below. | |
if isinstance(encoded, dict) and factor.metadata.drop_field: | |
cache_key = factor.expr | |
else: | |
cache_key = (factor.expr, reduced_rank) | |
self.encoded_cache[cache_key] = encoded | |
else: | |
encoded = as_columns( | |
factor.values | |
) # pragma: no cover; we don't use this in formulaic yet. | |
encoded = FactorValues( | |
encoded, | |
metadata=getattr(encoded, "__formulaic_metadata__", factor.metadata), | |
encoded=True, | |
) | |
# Encoded factors will now all be dicts | |
if ( | |
isinstance(encoded, dict) | |
and encoded.__formulaic_metadata__.spans_intercept | |
and reduced_rank | |
): | |
encoded = FactorValues( | |
encoded.copy(), metadata=encoded.__formulaic_metadata__ | |
) | |
del encoded[encoded.__formulaic_metadata__.drop_field] | |
return self._flatten_encoded_evaled_factor(factor.expr, encoded) | |
def _extract_columns_for_encoding( | |
self, factor: EvaluatedFactor | |
) -> Union[Any, Dict[str, Any]]: | |
""" | |
If incoming factor has values that need to be unpacked into columns | |
(e.g. a two-dimensions numpy array), do that expansion here. Otherwise, | |
return the current factor values. | |
""" | |
return as_columns(factor.values) | |
def _flatten_encoded_evaled_factor( | |
self, name: str, values: FactorValues[dict] | |
) -> Dict[str, Any]: | |
if not isinstance(values, dict): | |
return {name: values} | |
# Some nested dictionaries may not be a `FactorValues[dict]` instance, | |
# in which case we impute the default formatter in `FactorValues.format`. | |
if hasattr(values, "__formulaic_metadata__"): | |
name_format = values.__formulaic_metadata__.format | |
else: | |
name_format = FactorValuesMetadata.format | |
flattened = {} | |
for subfield, value in values.items(): | |
if isinstance(subfield, str) and subfield.startswith("__"): | |
continue | |
subname = name_format.format(name=name, field=subfield) | |
if isinstance(value, dict): | |
flattened.update(self._flatten_encoded_evaled_factor(subname, value)) | |
else: | |
flattened[subname] = value | |
return flattened | |
@abstractmethod | |
def _encode_constant(self, value, metadata, encoder_state, spec, drop_rows): | |
pass # pragma: no cover | |
@abstractmethod | |
def _encode_categorical( | |
self, values, metadata, encoder_state, spec, drop_rows, reduced_rank=False | |
): | |
pass # pragma: no cover | |
@abstractmethod | |
def _encode_numerical(self, values, metadata, encoder_state, spec, drop_rows): | |
pass # pragma: no cover | |
# Methods related to ModelMatrix output | |
def _enforce_structure( | |
self, | |
cols: List[Tuple[Term, List[ScopedTerm], Dict[str, Any]]], | |
spec, | |
drop_rows: set, | |
) -> Generator[Tuple[Term, List[ScopedTerm], Dict[str, Any]]]: | |
# TODO: Verify that imputation strategies are intuitive and make sense. | |
assert len(cols) == len(spec.structure) | |
for i, col_spec in enumerate(cols): | |
scoped_cols = col_spec[2] | |
target_cols = spec.structure[i][2] | |
if len(scoped_cols) > len(target_cols): | |
raise FactorEncodingError( | |
f"Term `{col_spec[0]}` has generated too many columns compared to specification: generated {list(scoped_cols)}, expecting {target_cols}." | |
) | |
if len(scoped_cols) < len(target_cols): | |
if len(scoped_cols) == 0: | |
col = self._encode_constant(0, None, None, spec, drop_rows) | |
elif len(scoped_cols) == 1: | |
col = tuple(scoped_cols.values())[0] | |
else: | |
raise FactorEncodingError( | |
f"Term `{col_spec[0]}` has generated insufficient columns compared to specification: generated {list(scoped_cols)}, expecting {target_cols}." | |
) | |
scoped_cols = {name: col for name in target_cols} | |
elif set(scoped_cols) != set(target_cols): | |
raise FactorEncodingError( | |
f"Term `{col_spec[0]}` has generated columns that are inconsistent with specification: generated {list(scoped_cols)}, expecting {target_cols}." | |
) | |
yield col_spec[0], col_spec[1], { | |
col: scoped_cols[col] for col in target_cols | |
} | |
def _get_columns_for_term(self, factors, spec, scale=1): | |
""" | |
Assemble the columns for a model matrix given factors and a scale. | |
This performs the row-wise Kronecker product of the factors. | |
Args: | |
factors | |
scale | |
Returns: | |
dict | |
""" | |
out = OrderedDict() | |
for product in itertools.product(*(factor.items() for factor in factors)): | |
out[":".join(p[0] for p in product)] = scale * functools.reduce( | |
operator.mul, (p[1] for p in product) | |
) | |
return out | |
@abstractmethod | |
def _combine_columns(self, cols, spec, drop_rows): | |
pass # pragma: no cover | |
''' | |
inliner_packages = { | |
"formulaic.model_spec": [ | |
0, 2786, 18313, 1680704490], | |
"formulaic.model_matrix": [ | |
0, 18313, 21383, 1680704490], | |
"formulaic.formula": [ | |
0, 21383, 30338, 1680704490], | |
"formulaic": [ | |
1, 30338, 31068, 1680704490], | |
"formulaic.utils.cast": [ | |
0, 31068, 32979, 1680704490], | |
"formulaic.utils.layered_mapping": [ | |
0, 32979, 35895, 1680704490], | |
"formulaic.utils.stateful_transforms": [ | |
0, 35895, 45370, 1680704490], | |
"formulaic.utils": [ | |
1, 45370, 45370, 1680704490], | |
"formulaic.utils.calculus": [ | |
0, 45370, 49059, 1680704490], | |
"formulaic.utils.context": [ | |
0, 49059, 50930, 1680704490], | |
"formulaic.utils.sentinels": [ | |
0, 50930, 51358, 1680704490], | |
"formulaic.utils.iterators": [ | |
0, 51358, 52375, 1680704490], | |
"formulaic.utils.sparse": [ | |
0, 52375, 53973, 1680704490], | |
"formulaic.utils.constraints": [ | |
0, 53973, 71006, 1680704490], | |
"formulaic.parser.types.formula_parser": [ | |
0, 71006, 73839, 1680704490], | |
"formulaic.parser.types.token": [ | |
0, 73839, 81401, 1680704490], | |
"formulaic.parser.types.term": [ | |
0, 81401, 82879, 1680704490], | |
"formulaic.parser.types.operator_resolver": [ | |
0, 82879, 86589, 1680704490], | |
"formulaic.parser.types.operator": [ | |
0, 86589, 90603, 1680704490], | |
"formulaic.parser.types": [ | |
1, 90603, 91013, 1680704490], | |
"formulaic.parser.types.structured": [ | |
0, 91013, 108139, 1680704490], | |
"formulaic.parser.types.factor": [ | |
0, 108139, 111151, 1680704490], | |
"formulaic.parser.types.ast_node": [ | |
0, 111151, 114323, 1680704490], | |
"formulaic.parser": [ | |
1, 114323, 114463, 1680704490], | |
"formulaic.parser.algos": [ | |
1, 114463, 114587, 1680704490], | |
"formulaic.parser.algos.tokens_to_ast": [ | |
0, 114587, 119813, 1680704490], | |
"formulaic.parser.algos.tokenize": [ | |
0, 119813, 126298, 1680704490], | |
"formulaic.parser.parser": [ | |
0, 126298, 135852, 1680704490], | |
"formulaic.parser.utils": [ | |
0, 135852, 144413, 1680704490], | |
"formulaic.sugar": [ | |
0, 144413, 146655, 1680704490], | |
"formulaic.transforms.contrasts": [ | |
0, 146655, 171974, 1680704490], | |
"formulaic.transforms.basis_spline": [ | |
0, 171974, 180136, 1680704490], | |
"formulaic.transforms": [ | |
1, 180136, 180896, 1680704490], | |
"formulaic.transforms.poly": [ | |
0, 180896, 185491, 1680704490], | |
"formulaic.transforms.identity": [ | |
0, 185491, 185527, 1680704490], | |
"formulaic.transforms.scale": [ | |
0, 185527, 186944, 1680704490], | |
"formulaic.errors": [ | |
0, 186944, 187939, 1680704490], | |
"formulaic.materializers.types.enums": [ | |
0, 187939, 188045, 1680704490], | |
"formulaic.materializers.types.scoped_term": [ | |
0, 188045, 189164, 1680704490], | |
"formulaic.materializers.types.evaluated_factor": [ | |
0, 189164, 190839, 1680704490], | |
"formulaic.materializers.types.scoped_factor": [ | |
0, 190839, 191544, 1680704490], | |
"formulaic.materializers.types": [ | |
1, 191544, 191846, 1680704490], | |
"formulaic.materializers.types.factor_values": [ | |
0, 191846, 196067, 1680704490], | |
"formulaic.materializers.pandas": [ | |
0, 196067, 202107, 1680704490], | |
"formulaic.materializers.arrow": [ | |
0, 202107, 203045, 1680704490], | |
"formulaic.materializers": [ | |
1, 203045, 203349, 1680704490], | |
"formulaic.materializers.base": [ | |
0, 203349, 232736, 1680704490] | |
} | |
def prepare_package(): | |
# Loader's module name changes with each major version to be able to have | |
# different loaders working at the same time. | |
module_name = PINLINER_MODULE_NAME + '_' + loader_version.split('.')[0] | |
# If the loader code is not already loaded we create a specific module for | |
# it. We need to do it this way so that the functions in there are not | |
# compiled with a reference to this module's global dictionary in | |
# __globals__. | |
module = sys.modules.get(module_name) | |
if not module: | |
module = types.ModuleType(module_name) | |
module.__package__ = '' | |
module.__file__ = module_name + '.py' | |
exec(inliner_importer_code, module.__dict__) | |
sys.modules[module_name] = module | |
# We cannot use __file__ directly because on the second run __file__ will | |
# be the compiled file (.pyc) and that's not the file we want to read. | |
filename = os.path.splitext(__file__)[0] + '.py' | |
# Add our own finder and loader for this specific package if it's not | |
# already there. | |
# This must be done before we initialize the package, as it may import | |
# packages and modules contained in the package itself. | |
for finder in sys.meta_path: | |
if (isinstance(finder, module.InlinerImporter) and | |
finder.data == inliner_packages): | |
importer = finder | |
else: | |
# If we haven't forced the setting of the uncaught exception handler | |
# we replace it only if it hasn't been replace yet, this is because | |
# CPython default handler does not use traceback or even linecache, so | |
# it never calls get_source method to get the code, but for example | |
# iPython does, so we don't need to replace the handler. | |
if FORCE_EXC_HOOK is None: | |
set_excepthook = sys.__excepthook__ == sys.excepthook | |
else: | |
set_excepthook = FORCE_EXC_HOOK | |
importer = module.InlinerImporter(inliner_packages, filename, | |
set_excepthook) | |
sys.meta_path.append(importer) | |
# If this is a bundle (multiple packages) without default then don't import | |
# any package automatically. | |
if not PINLINED_DEFAULT_PACKAGE: | |
return | |
__, start, end, ts = inliner_packages[PINLINED_DEFAULT_PACKAGE] | |
with open(filename) as datafile: | |
datafile.seek(start) | |
code = datafile.read(end - start) | |
# We need everything to be local variables before we clear the global dict | |
def_package = PINLINED_DEFAULT_PACKAGE | |
name = __name__ | |
filename = def_package + '/__init__.py' | |
compiled_code = compile(code, filename, 'exec') | |
# Prepare globals to execute __init__ code | |
globals().clear() | |
# If we've been called directly we cannot set __path__ | |
if name != '__main__': | |
globals()['__path__'] = [def_package] | |
else: | |
def_package = None | |
globals().update(__file__=filename, | |
__package__=def_package, | |
__name__=name, | |
__loader__=importer) | |
exec(compiled_code, globals()) | |
# Prepare loader's module and populate this namespace only with package's | |
# __init__ | |
prepare_package() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment