Source code for numpydantic.testing.interfaces

from datetime import datetime, timezone
from pathlib import Path

import cv2
import dask.array as da
import h5py
import numpy as np
import zarr
from pydantic import BaseModel

from numpydantic.interface import (
    DaskInterface,
    H5ArrayPath,
    H5Interface,
    NumpyInterface,
    VideoInterface,
    ZarrArrayPath,
    ZarrInterface,
)
from numpydantic.testing.helpers import InterfaceCase
from numpydantic.types import DtypeType, NDArrayType


[docs] class NumpyCase(InterfaceCase): """In-memory numpy array""" interface = NumpyInterface
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> np.ndarray: if array is not None: return np.array(array, dtype=dtype) elif issubclass(dtype, BaseModel): return np.full(shape=shape, fill_value=dtype(x=1)) elif len(shape) == 0: return dtype() else: return np.zeros(shape=shape, dtype=dtype)
class _HDF5MetaCase(InterfaceCase): """Base case for hdf5 cases""" interface = H5Interface @classmethod def skip(cls, shape: tuple[int, ...], dtype: DtypeType) -> bool: return issubclass(dtype, BaseModel)
[docs] class HDF5Case(_HDF5MetaCase): """HDF5 Array"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> H5ArrayPath | None: if cls.skip(shape, dtype): # pragma: no cover return None hdf5_file = path / "h5f.h5" array_path = "/" + "_".join([str(s) for s in shape]) + "__" + dtype.__name__ generator = np.random.default_rng() if array is not None: data = np.array(array, dtype=dtype) elif dtype is str: data = generator.random(shape).astype(bytes) elif dtype is np.str_: data = generator.random(shape).astype("S32") elif dtype is datetime: data = np.empty(shape, dtype="S32") data.fill(datetime.now(timezone.utc).isoformat().encode("utf-8")) else: data = generator.random(shape).astype(dtype) h5path = H5ArrayPath(hdf5_file, array_path) with h5py.File(hdf5_file, "w") as h5f: _ = h5f.create_dataset(array_path, data=data) return h5path
[docs] class HDF5CompoundCase(_HDF5MetaCase): """HDF5 Array with a fake compound dtype"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> H5ArrayPath | None: if cls.skip(shape, dtype): # pragma: no cover return None hdf5_file = path / "h5f.h5" array_path = "/" + "_".join([str(s) for s in shape]) + "__" + dtype.__name__ if array is not None: data = np.array(array, dtype=dtype) elif dtype in (str, np.str_): dt = np.dtype([("data", np.dtype("S10")), ("extra", "i8")]) data = np.array([("hey", 0)] * int(np.prod(shape)), dtype=dt).reshape(shape) elif dtype is datetime: dt = np.dtype([("data", np.dtype("S32")), ("extra", "i8")]) data = np.array( [(datetime.now(timezone.utc).isoformat().encode("utf-8"), 0)] * int(np.prod(shape)), dtype=dt, ).reshape(shape) else: dt = np.dtype([("data", dtype), ("extra", "i8")]) data = np.zeros(shape, dtype=dt) h5path = H5ArrayPath(hdf5_file, array_path, "data") with h5py.File(hdf5_file, "w") as h5f: _ = h5f.create_dataset(array_path, data=data) return h5path
[docs] class DaskCase(InterfaceCase): """In-memory dask array""" interface = DaskInterface
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> da.Array: if array is not None: return da.array(array, dtype=dtype) if issubclass(dtype, BaseModel): return da.full(shape=shape, fill_value=dtype(x=1), chunks=-1) else: return da.zeros(shape=shape, dtype=dtype, chunks=10)
class _ZarrMetaCase(InterfaceCase): """Shared classmethods for zarr cases""" interface = ZarrInterface @classmethod def skip(cls, shape: tuple[int, ...], dtype: DtypeType) -> bool: return issubclass(dtype, BaseModel) or dtype is str
[docs] class ZarrCase(_ZarrMetaCase): """In-memory zarr array"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> zarr.Array | None: if array is not None: return zarr.array(array, dtype=dtype, chunks=-1) else: return zarr.zeros(shape=shape, dtype=dtype)
[docs] class ZarrDirCase(_ZarrMetaCase): """On-disk zarr array"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> zarr.Array | None: store = zarr.DirectoryStore(str(path / "array.zarr")) if array is not None: return zarr.array(array, dtype=dtype, store=store, chunks=-1) else: return zarr.zeros(shape=shape, dtype=dtype, store=store)
[docs] class ZarrZipCase(_ZarrMetaCase): """Zarr zip store"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> zarr.Array | None: store = zarr.ZipStore(str(path / "array.zarr"), mode="w") if array is not None: return zarr.array(array, dtype=dtype, store=store, chunks=-1) else: return zarr.zeros(shape=shape, dtype=dtype, store=store)
[docs] class ZarrNestedCase(_ZarrMetaCase): """Nested zarr array"""
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10), dtype: DtypeType = float, path: Path | None = None, array: NDArrayType | None = None, ) -> ZarrArrayPath: file = str(path / "nested.zarr") root = zarr.open(file, mode="w") subpath = "a/b/c" if array is not None: _ = root.array(subpath, array, dtype=dtype) else: _ = root.zeros(subpath, shape=shape, dtype=dtype) return ZarrArrayPath(file=file, path=subpath)
[docs] class VideoCase(InterfaceCase): """AVI video""" interface = VideoInterface
[docs] @classmethod def make_array( cls, shape: tuple[int, ...] = (10, 10, 10, 3), dtype: DtypeType = np.uint8, path: Path | None = None, array: NDArrayType | None = None, ) -> Path | None: if cls.skip(shape, dtype): # pragma: no cover return None if array is not None: array = np.array(array, dtype=np.uint8) shape = array.shape is_color = len(shape) == 4 frames = shape[0] frame_shape = shape[1:] video_path = path / "test.avi" writer = cv2.VideoWriter( str(video_path), cv2.VideoWriter_fourcc(*"RGBA"), # raw video for testing purposes 30, (frame_shape[1], frame_shape[0]), is_color, ) for i in range(frames): if array is not None: frame = array[i] else: # make fresh array every time bc opencv eats them frame = np.full(frame_shape, fill_value=i, dtype=np.uint8) writer.write(frame) writer.release() return video_path
[docs] @classmethod def skip(cls, shape: tuple[int, ...], dtype: DtypeType) -> bool: """ We really can only handle 4 dimensional cases in 8-bit rn lol .. todo:: Fix shape/writing for grayscale videos """ if len(shape) != 4: return True # if len(shape) < 3 or len(shape) > 4: # return True if dtype not in (int, np.uint8): return True # if we have a color video (ie. shape == 4, needs to be RGB) if len(shape) == 4 and shape[3] != 3: return True