robokudo.io.cas_view_codecs

Codec registry and serializers for CAS view persistence.

Classes

ViewPayload

Persistence envelope for a single CAS view.

ViewCodec

Interface for encoding and decoding CAS view values.

JsonLikeCodec

Codec for plain JSON-like Python values.

TupleCodec

Codec for tuple values.

BytesCodec

Codec for raw bytes data.

NumpyCodec

Codec for NumPy arrays.

RosMessageCodec

Codec for ROS message instances.

Open3DPointCloudCodec

Codec for Open3D point cloud values.

Open3DPinholeCameraIntrinsicCodec

Codec for Open3D pinhole camera intrinsic values.

StampedTransformCodec

Codec for RoboKudo StampedTransform values.

HomogeneousTransformationMatrixCodec

Codec for SemDT homogeneous transformation matrices.

KrroodCodec

Codec that delegates serialization to KRROOD.

CASViewCodecRegistry

Registry that routes CAS views to matching codecs.

Functions

_recursive_convert(→ typing_extensions.Any)

Recursively convert non-JSON-friendly array-like values into plain containers.

_ros_message_to_dict(→ typing_extensions.Dict[str, ...)

Convert a ROS message object into plain dictionary data.

_dict_to_ros_message(→ typing_extensions.Any)

Populate a ROS message instance from plain dictionary data.

_is_json_primitive(→ bool)

Return True when a value is a JSON primitive.

_is_json_like(→ bool)

Return True when a value can be represented as plain JSON data.

_full_type_name(→ str)

Return the fully qualified type name for a value.

_load_type(→ type[typing_extensions.Any])

Load a Python type from its fully qualified name.

_is_open3d_pinhole_camera_intrinsic(→ bool)

Check whether a value is an Open3D pinhole camera intrinsic object.

Module Contents

robokudo.io.cas_view_codecs._recursive_convert(value: typing_extensions.Any) typing_extensions.Any

Recursively convert non-JSON-friendly array-like values into plain containers.

robokudo.io.cas_view_codecs._ros_message_to_dict(msg: typing_extensions.Any) typing_extensions.Dict[str, typing_extensions.Any]

Convert a ROS message object into plain dictionary data.

robokudo.io.cas_view_codecs._dict_to_ros_message(message_type: type[typing_extensions.Any], data_dict: typing_extensions.Dict[str, typing_extensions.Any]) typing_extensions.Any

Populate a ROS message instance from plain dictionary data.

robokudo.io.cas_view_codecs._is_json_primitive(value: typing_extensions.Any) bool

Return True when a value is a JSON primitive.

robokudo.io.cas_view_codecs._is_json_like(value: typing_extensions.Any) bool

Return True when a value can be represented as plain JSON data.

robokudo.io.cas_view_codecs._full_type_name(value: typing_extensions.Any) str

Return the fully qualified type name for a value.

robokudo.io.cas_view_codecs._load_type(type_name: str) type[typing_extensions.Any]

Load a Python type from its fully qualified name.

robokudo.io.cas_view_codecs._is_open3d_pinhole_camera_intrinsic(value: typing_extensions.Any) bool

Check whether a value is an Open3D pinhole camera intrinsic object.

class robokudo.io.cas_view_codecs.ViewPayload

Persistence envelope for a single CAS view.

serializer_id: str
payload: typing_extensions.Any
type_name: str
metadata: typing_extensions.Dict[str, typing_extensions.Any]
to_document(view_name: str) typing_extensions.Dict[str, typing_extensions.Any]

Build a storage document for one encoded view.

static from_document(document: typing_extensions.Dict[str, typing_extensions.Any]) ViewPayload

Create a payload object from a storage document.

class robokudo.io.cas_view_codecs.ViewCodec

Interface for encoding and decoding CAS view values.

serializer_id: str
abstract can_encode(value: typing_extensions.Any) bool

Return True when this codec can encode value.

abstract encode(value: typing_extensions.Any) ViewPayload

Encode a runtime value into a ViewPayload.

abstract decode(payload: ViewPayload) typing_extensions.Any

Decode a persisted payload back into a runtime value.

class robokudo.io.cas_view_codecs.JsonLikeCodec

Bases: ViewCodec

Codec for plain JSON-like Python values.

serializer_id: str = 'json_like_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is already JSON-compatible.

encode(value: typing_extensions.Any) ViewPayload

Wrap a JSON-like value without further transformation.

decode(payload: ViewPayload) typing_extensions.Any

Return the raw payload value.

class robokudo.io.cas_view_codecs.TupleCodec

Bases: ViewCodec

Codec for tuple values.

serializer_id: str = 'tuple_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is a tuple.

encode(value: tuple[typing_extensions.Any, Ellipsis]) ViewPayload

Encode a tuple as a list payload.

decode(payload: ViewPayload) tuple[typing_extensions.Any, Ellipsis]

Decode list payload data back to a tuple.

class robokudo.io.cas_view_codecs.BytesCodec

Bases: ViewCodec

Codec for raw bytes data.

serializer_id: str = 'bytes_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is bytes or bytearray.

encode(value: bytes | bytearray) ViewPayload

Encode bytes as a base64 text payload.

decode(payload: ViewPayload) bytes

Decode base64 text payload into raw bytes.

class robokudo.io.cas_view_codecs.NumpyCodec

Bases: ViewCodec

Codec for NumPy arrays.

serializer_id: str = 'numpy_npy_base64_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is a NumPy array.

encode(value: numpy.ndarray) ViewPayload

Encode a NumPy array as base64 encoded .npy data.

decode(payload: ViewPayload) numpy.ndarray

Decode base64 encoded .npy payload data.

class robokudo.io.cas_view_codecs.RosMessageCodec

Bases: ViewCodec

Codec for ROS message instances.

serializer_id: str = 'ros_message_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value looks like a ROS message.

encode(value: typing_extensions.Any) ViewPayload

Encode a ROS message to dictionary payload data.

decode(payload: ViewPayload) typing_extensions.Any

Decode dictionary payload data to a ROS message object.

class robokudo.io.cas_view_codecs.Open3DPointCloudCodec

Bases: ViewCodec

Codec for Open3D point cloud values.

serializer_id: str = 'open3d_pointcloud_pcd_base64_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is an Open3D point cloud.

encode(value: typing_extensions.Any) ViewPayload

Encode a point cloud as base64-encoded PCD data.

decode(payload: ViewPayload) typing_extensions.Any

Decode base64-encoded PCD data to an Open3D point cloud.

class robokudo.io.cas_view_codecs.Open3DPinholeCameraIntrinsicCodec

Bases: ViewCodec

Codec for Open3D pinhole camera intrinsic values.

serializer_id: str = 'open3d_pinhole_camera_intrinsic_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is an Open3D pinhole camera intrinsic.

encode(value: typing_extensions.Any) ViewPayload

Encode an Open3D pinhole intrinsic into JSON-compatible payload data.

decode(payload: ViewPayload) typing_extensions.Any

Decode payload data to an Open3D pinhole intrinsic object.

class robokudo.io.cas_view_codecs.StampedTransformCodec

Bases: ViewCodec

Codec for RoboKudo StampedTransform values.

serializer_id: str = 'robokudo_stamped_transform_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is a StampedTransform.

encode(value: robokudo.types.tf.StampedTransform) ViewPayload

Encode a StampedTransform into plain dictionary data.

decode(payload: ViewPayload) robokudo.types.tf.StampedTransform

Decode plain dictionary data to a StampedTransform.

class robokudo.io.cas_view_codecs.HomogeneousTransformationMatrixCodec

Bases: ViewCodec

Codec for SemDT homogeneous transformation matrices.

serializer_id: str = 'semdt_homogeneous_transform_v1'
can_encode(value: typing_extensions.Any) bool

Check whether the value is a homogeneous transformation matrix.

encode(value: semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix) ViewPayload

Encode a transformation matrix with its native JSON representation.

decode(payload: ViewPayload) semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix

Decode matrix payload data using the active world entity tracker.

class robokudo.io.cas_view_codecs.KrroodCodec

Bases: ViewCodec

Codec that delegates serialization to KRROOD.

serializer_id: str = 'krrood_json_v1'
can_encode(value: typing_extensions.Any) bool

Probe whether KRROOD can serialize the given value.

encode(value: typing_extensions.Any) ViewPayload

Encode a value using KRROOD JSON serialization.

decode(payload: ViewPayload) typing_extensions.Any

Decode a value using KRROOD JSON deserialization.

class robokudo.io.cas_view_codecs.CASViewCodecRegistry

Registry that routes CAS views to matching codecs.

codecs: typing_extensions.List[ViewCodec] = []
__post_init__() None

Initialize a default codec order when no codecs are provided.

_find_encoder(value: typing_extensions.Any) typing_extensions.Optional[ViewCodec]

Return the first registered codec that can encode value.

_find_decoder(serializer_id: str) ViewCodec

Return the codec registered for a serializer identifier.

encode_view(view_name: str, value: typing_extensions.Any, strict: bool = True) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.Any]]

Encode a single CAS view into a storage document.

decode_view(document: typing_extensions.Dict[str, typing_extensions.Any]) tuple[str, typing_extensions.Any]

Decode a single storage document into one CAS view.

encode_views(views: typing_extensions.Dict[str, typing_extensions.Any], strict: bool = True) typing_extensions.List[typing_extensions.Dict[str, typing_extensions.Any]]

Encode multiple CAS views into storage documents.

decode_views(documents: typing_extensions.Iterable[typing_extensions.Dict[str, typing_extensions.Any]]) typing_extensions.Dict[str, typing_extensions.Any]

Decode storage documents into CAS view values.