"""
Helper functions for use with :class:`~numpydantic.NDArray` - see the note in
:mod:`~numpydantic.ndarray` for why these are separated.
"""
import hashlib
import json
from typing import Any, Callable, Optional, Union
import nptyping.structure
import numpy as np
from nptyping import Shape
from pydantic import SerializationInfo
from pydantic_core import CoreSchema, core_schema
from pydantic_core.core_schema import ListSchema, ValidationInfo
from numpydantic import dtype as dt
from numpydantic.interface import Interface
from numpydantic.maps import np_to_python
from numpydantic.types import DtypeType, NDArrayType, ShapeType
_handler_type = Callable[[Any], core_schema.CoreSchema]
_UNSUPPORTED_TYPES = (complex,)
def _numeric_dtype(dtype: DtypeType, _handler: _handler_type) -> CoreSchema:
"""Make a numeric dtype that respects min/max values from extended numpy types"""
if dtype in (np.number,):
dtype = float
if issubclass(dtype, np.floating):
info = np.finfo(dtype)
schema = core_schema.float_schema(le=float(info.max), ge=float(info.min))
elif issubclass(dtype, np.integer):
info = np.iinfo(dtype)
schema = core_schema.int_schema(le=int(info.max), ge=int(info.min))
else:
schema = _handler.generate_schema(dtype)
return schema
def _lol_dtype(dtype: DtypeType, _handler: _handler_type) -> CoreSchema:
"""Get the innermost dtype schema to use in the generated pydantic schema"""
if isinstance(dtype, nptyping.structure.StructureMeta): # pragma: no cover
raise NotImplementedError("Structured dtypes are currently unsupported")
if isinstance(dtype, tuple):
# if it's a meta-type that refers to a generic float/int, just make that
if dtype == dt.Float:
array_type = core_schema.float_schema()
elif dtype == dt.Integer:
array_type = core_schema.int_schema()
elif dtype == dt.Complex:
array_type = core_schema.any_schema()
else:
# make a union of dtypes recursively
types_ = list(set(dtype))
array_type = core_schema.union_schema(
[_lol_dtype(t, _handler) for t in types_]
)
else:
try:
python_type = np_to_python[dtype]
except KeyError as e: # pragma: no cover
# this should pretty much only happen in downstream/3rd-party interfaces
# that use interface-specific types. those need to provide mappings back
# to base python types (making this more streamlined is TODO)
if dtype in np_to_python.values():
# it's already a python type
python_type = dtype
else:
raise ValueError(
"dtype given in model does not have a corresponding python base "
"type - add one to the `maps.np_to_python` dict"
) from e
if python_type in _UNSUPPORTED_TYPES:
array_type = core_schema.any_schema()
# TODO: warn and log here
elif python_type in (float, int):
array_type = _numeric_dtype(dtype, _handler)
else:
array_type = _handler.generate_schema(python_type)
return array_type
[docs]
def list_of_lists_schema(shape: Shape, array_type: CoreSchema) -> ListSchema:
"""
Make a pydantic JSON schema for an array as a list of lists.
For each item in the shape, create a list schema. In the innermost schema
insert the passed ``array_type`` schema.
This function is typically called from :func:`.make_json_schema`
Args:
shape (:class:`.Shape` ): Shape determines the depth and max/min elements
for each layer of list schema
array_type ( :class:`pydantic_core.CoreSchema` ): The pre-rendered pydantic
core schema to use in the innermost list entry
"""
shape_parts = shape.__args__[0].split(",")
split_parts = [
p.split(" ")[1] if len(p.split(" ")) == 2 else None for p in shape_parts
]
# Construct a list of list schema
# go in reverse order - construct list schemas such that
# the final schema is the one that checks the first dimension
shape_labels = reversed(split_parts)
shape_args = reversed(shape.prepared_args)
list_schema = None
for arg, label in zip(shape_args, shape_labels):
# which handler to use? for the first we use the actual type
# handler, everywhere else we use the prior list handler
inner_schema = array_type if list_schema is None else list_schema
# make a label annotation, if we have one
metadata = {"name": label} if label is not None else None
# make the current level list schema, accounting for shape
if arg == "*":
list_schema = core_schema.list_schema(inner_schema, metadata=metadata)
elif arg == "...":
list_schema = _unbounded_shape(inner_schema, metadata=metadata)
else:
try:
arg = int(arg)
except ValueError as e:
raise ValueError(
"Array shapes must be integers, wildcards, or ellipses. "
"Shape variables (for declaring that one dimension must be the "
"same size as another) are not supported because it is "
"impossible to express dynamic minItems/maxItems in JSON Schema. "
"See: https://github.com/orgs/json-schema-org/discussions/730"
) from e
list_schema = core_schema.list_schema(
inner_schema, min_length=arg, max_length=arg, metadata=metadata
)
return list_schema
def _hash_schema(schema: CoreSchema) -> str:
"""
Make a hex-encoded 8-byte blake2b hash from a pydantic core schema.
Collisions are really not important or likely here, but we do want the same schema
to produce the same hash.
"""
schema_str = json.dumps(
schema, sort_keys=True, indent=None, separators=(",", ":")
).encode("utf-8")
hasher = hashlib.blake2b(digest_size=8)
hasher.update(schema_str)
return hasher.hexdigest()
def _unbounded_shape(
inner_type: CoreSchema, metadata: Optional[dict] = None
) -> core_schema.DefinitionsSchema:
"""
Make a recursive schema that refers to itself using a hashed version of the inner
type
"""
schema_hash = _hash_schema(inner_type)
array_ref = f"any-shape-array-{schema_hash}"
schema = core_schema.definitions_schema(
core_schema.list_schema(
core_schema.definition_reference_schema(array_ref), metadata=metadata
),
[
core_schema.union_schema(
[
core_schema.list_schema(
core_schema.definition_reference_schema(array_ref)
),
inner_type,
],
ref=array_ref,
)
],
)
return schema
[docs]
def make_json_schema(
shape: ShapeType, dtype: DtypeType, _handler: _handler_type
) -> ListSchema:
"""
Make a list of list JSON schema from a shape and a dtype.
First resolves the dtype into a pydantic ``CoreSchema`` ,
and then uses that with :func:`.list_of_lists_schema` .
Args:
shape ( ShapeType ): Specification of a shape, as a tuple or
an nptyping ``Shape``
dtype ( DtypeType ): A builtin type or numpy dtype
_handler: The pydantic schema generation handler (see pydantic docs)
Returns:
:class:`pydantic_core.core_schema.ListSchema`
"""
dtype_schema = _lol_dtype(dtype, _handler)
# get the names of the shape constraints, if any
if shape is Any:
list_schema = _unbounded_shape(dtype_schema)
# list_schema = core_schema.list_schema(core_schema.any_schema())
else:
list_schema = list_of_lists_schema(shape, dtype_schema)
return list_schema
[docs]
def get_validate_interface(shape: ShapeType, dtype: DtypeType) -> Callable:
"""
Validate using a matching :class:`.Interface` class using its
:meth:`.Interface.validate` method
"""
def validate_interface(value: Any, info: "ValidationInfo") -> NDArrayType:
interface_cls = Interface.match(value)
interface = interface_cls(shape, dtype)
value = interface.validate(value)
return value
return validate_interface
def _jsonize_array(value: Any, info: SerializationInfo) -> Union[list, dict]:
"""Use an interface class to render an array as JSON"""
interface_cls = Interface.match_output(value)
return interface_cls.to_json(value, info)