"""
Interface to support treating videos like arrays using OpenCV
"""
from pathlib import Path
from typing import Any, Literal, TypeAlias
import numpy as np
from pydantic_core.core_schema import SerializationInfo
from numpydantic.interface import JsonDict
from numpydantic.interface.interface import Interface
try:
import cv2
from cv2 import VideoCapture
_CaptureUnion: TypeAlias = VideoCapture | None
except ImportError: # pragma: no cover
cv2 = None
VideoCapture = None
_CaptureUnion: TypeAlias = None
VIDEO_EXTENSIONS = (".mp4", ".avi", ".mov", ".mkv")
[docs]
class VideoJsonDict(JsonDict):
"""Json-able roundtrip representation of a video file"""
type: Literal["video"]
file: str
[docs]
class VideoProxy:
"""
Passthrough proxy class to interact with videos as arrays
"""
def __init__(self, path: Path | None = None, video: _CaptureUnion = None):
if path is None and video is None: # pragma: no cover
raise ValueError(
"Need to either supply a path or an opened VideoCapture object"
)
if path is not None:
path = Path(path).resolve()
self.path = path
self._video = video # type: Optional[VideoCapture]
self._n_frames = None # type: Optional[int]
self._dtype = None # type: Optional[np.dtype]
self._shape = None # type: Optional[Tuple[int, ...]]
self._sample_frame = None # type: Optional[np.ndarray]
[docs]
@classmethod
def proxy_for(cls) -> type["VideoInterface"]:
"""Declare this class as a proxy for the VideoInterface"""
return VideoInterface
@property
def video(self) -> VideoCapture:
"""Opened video capture object"""
if self._video is None:
if self.path is None: # pragma: no cover
raise RuntimeError(
"Instantiated with a VideoCapture object that has been closed, "
"and it cant be reopened since source path cant be gotten "
"from VideoCapture objects"
)
if not self.path.exists():
raise FileNotFoundError(f"Video file {self.path} does not exist!")
self._video = VideoCapture(str(self.path))
return self._video
[docs]
def close(self) -> None:
"""Close the opened VideoCapture object"""
if self._video is not None:
self._video.release()
self._video = None
@property
def sample_frame(self) -> np.ndarray:
"""A stored frame from the video to use when calculating shape and dtype"""
if self._sample_frame is None:
current_frame = int(self.video.get(cv2.CAP_PROP_POS_FRAMES))
self.video.set(cv2.CAP_PROP_POS_FRAMES, max(0, current_frame - 1))
status, frame = self.video.read()
if not status: # pragma: no cover
raise RuntimeError("Could not read frame from video")
self.video.set(cv2.CAP_PROP_POS_FRAMES, current_frame)
self._sample_frame = frame
return self._sample_frame
@property
def shape(self) -> tuple[int, ...]:
"""
Shape of video like
``(n_frames, height, width, channels)``
Note that this order flips the order of height and width from typical resolution
specifications: eg. 1080p video is typically 1920x1080, but here it would be
1080x1920. This follows opencv's ordering, which matches expectations when
eg. an image is read and plotted with matplotlib: the first index is the
position in the 0th dimension - the height, or "y" axis - and the second is the
width/x.
"""
if self._shape is None:
self._shape = (self.n_frames, *self.sample_frame.shape)
return self._shape
@property
def dtype(self) -> np.dtype:
"""Numpy dtype (from ``sample_frame`` )"""
return self.sample_frame.dtype
@property
def n_frames(self) -> int:
"""
Try to get number of frames using opencv metadata, and manually count if no
t"""
if self._n_frames is None:
n_frames = self.video.get(cv2.CAP_PROP_FRAME_COUNT)
if n_frames == 0: # pragma: no cover
# have to count manually for some containers with bad metadata
# not testing for now, will wait until we encounter such a
# video in the wild where this doesn't work.
current_frame = self.video.get(cv2.CAP_PROP_POS_FRAMES)
self.video.set(cv2.CAP_PROP_POS_FRAMES, 0)
n_frames = 0
while True:
status, _ = self.video.read()
if not status:
break
n_frames += 1
self.video.set(cv2.CAP_PROP_POS_FRAMES, current_frame)
self._n_frames = int(n_frames)
return self._n_frames
def _get_frame(self, frame: int) -> np.ndarray:
self.video.set(cv2.CAP_PROP_POS_FRAMES, frame)
status, frame = self.video.read()
if not status: # pragma: no cover
raise ValueError(f"Could not get frame {frame}")
return frame
def _complete_slice(self, slice_: slice) -> slice:
"""Get a fully-built slice that can be passed to range"""
if slice_.step is None:
slice_ = slice(slice_.start, slice_.stop, 1)
if slice_.stop is None:
slice_ = slice(slice_.start, self.n_frames, slice_.step)
if slice_.start is None:
slice_ = slice(0, slice_.stop, slice_.step)
return slice_
[docs]
def __array__(self) -> np.ndarray:
"""Whole video as a numpy array"""
return self[:]
def __getitem__(self, item: int | slice | tuple) -> np.ndarray:
if isinstance(item, int):
# want a single frame
return self._get_frame(item)
elif isinstance(item, slice):
# slice of frames
item = self._complete_slice(item)
frames = [
self._get_frame(i) for i in range(item.start, item.stop, item.step)
]
return np.stack(frames)
else:
# slices are passed as tuples
# first arg needs to be handled specially
if isinstance(item[0], int):
# single frame
frame = self._get_frame(item[0])
# syntax doesn't work in 3.9 but would be more explicit...
# return frame[*item[1:]]
return frame[item[1:]]
elif isinstance(item[0], slice):
frames = []
# make a new slice since range cant take Nones, filling in missing vals
fslice = self._complete_slice(item[0])
for i in range(fslice.start, fslice.stop, fslice.step):
frames.append(self._get_frame(i))
frame = np.stack(frames)
# syntax doesn't work in 3.9 but would be simpler..
# return frame[:, *item[1:]]
# construct a new slice instead
new_slice = (slice(None, None, None), *item[1:])
return frame[new_slice]
else: # pragma: no cover
raise ValueError(f"indices must be an int or a slice! got {item}")
def __setitem__(self, key: int | slice, value: int | float | np.ndarray):
raise NotImplementedError("Setting pixel values on videos is not supported!")
def __getattr__(self, item: str):
if item == "__name__":
return "VideoProxy"
return getattr(self.video, item)
[docs]
def __eq__(self, other: "VideoProxy") -> bool:
"""Check if this is a proxy to the same video file"""
if not isinstance(other, VideoProxy):
raise TypeError("Can only compare equality of two VideoProxies")
return self.path == other.path
[docs]
def __len__(self) -> int:
"""Number of frames in the video"""
return self.shape[0]
[docs]
class VideoInterface(Interface):
"""
OpenCV interface to treat videos as arrays.
"""
name = "video"
input_types = (str, Path, VideoCapture, VideoProxy)
return_type = VideoProxy
json_model = VideoJsonDict
[docs]
@classmethod
def enabled(cls) -> bool:
"""Check if opencv-python is available in the environment"""
return cv2 is not None
[docs]
@classmethod
def check(cls, array: Any) -> bool:
"""
Check if array is a string or Path with a supported video extension,
or an opened VideoCapture object
"""
if (VideoCapture is not None and isinstance(array, VideoCapture)) or isinstance(
array, VideoProxy
):
return True
if isinstance(array, dict):
array = array.get("file", "")
if isinstance(array, str):
try:
array = Path(array)
except TypeError: # pragma: no cover
# fine, just not a video
return False
return isinstance(array, Path) and array.suffix.lower() in VIDEO_EXTENSIONS
[docs]
def before_validation(self, array: Any) -> VideoProxy:
"""Get a :class:`.VideoProxy` object for this video"""
if isinstance(array, VideoCapture):
proxy = VideoProxy(video=array)
elif isinstance(array, VideoProxy):
proxy = array
else:
proxy = VideoProxy(path=array)
return proxy
[docs]
@classmethod
def to_json(
cls, array: VideoProxy, info: SerializationInfo
) -> list | VideoJsonDict:
"""Return a json-representation of a video"""
if info.round_trip:
return VideoJsonDict(type=cls.name, file=str(array.path))
else:
return np.array(array).tolist()