robokudo.io.cas_view_codecs¶
Codec registry and serializers for CAS view persistence.
Classes¶
Persistence envelope for a single CAS view. |
|
Interface for encoding and decoding CAS view values. |
|
Codec for plain JSON-like Python values. |
|
Codec for tuple values. |
|
Codec for raw bytes data. |
|
Codec for NumPy arrays. |
|
Codec for ROS message instances. |
|
Codec for Open3D point cloud values. |
|
Codec for Open3D pinhole camera intrinsic values. |
|
Codec for RoboKudo |
|
Codec for SemDT homogeneous transformation matrices. |
|
Codec that delegates serialization to KRROOD. |
|
Registry that routes CAS views to matching codecs. |
Functions¶
|
Recursively convert non-JSON-friendly array-like values into plain containers. |
|
Convert a ROS message object into plain dictionary data. |
|
Populate a ROS message instance from plain dictionary data. |
|
Return |
|
Return |
|
Return the fully qualified type name for a value. |
|
Load a Python type from its fully qualified name. |
Check whether a value is an Open3D pinhole camera intrinsic object. |
Module Contents¶
- robokudo.io.cas_view_codecs._recursive_convert(value: typing_extensions.Any) typing_extensions.Any¶
-
Recursively convert non-JSON-friendly array-like values into plain containers.
- robokudo.io.cas_view_codecs._ros_message_to_dict(msg: typing_extensions.Any) typing_extensions.Dict[str, typing_extensions.Any]¶
-
Convert a ROS message object into plain dictionary data.
- robokudo.io.cas_view_codecs._dict_to_ros_message(message_type: type[typing_extensions.Any], data_dict: typing_extensions.Dict[str, typing_extensions.Any]) typing_extensions.Any¶
-
Populate a ROS message instance from plain dictionary data.
- robokudo.io.cas_view_codecs._is_json_primitive(value: typing_extensions.Any) bool¶
-
Return
Truewhen a value is a JSON primitive.
- robokudo.io.cas_view_codecs._is_json_like(value: typing_extensions.Any) bool¶
-
Return
Truewhen a value can be represented as plain JSON data.
- robokudo.io.cas_view_codecs._full_type_name(value: typing_extensions.Any) str¶
-
Return the fully qualified type name for a value.
- robokudo.io.cas_view_codecs._load_type(type_name: str) type[typing_extensions.Any]¶
-
Load a Python type from its fully qualified name.
- robokudo.io.cas_view_codecs._is_open3d_pinhole_camera_intrinsic(value: typing_extensions.Any) bool¶
-
Check whether a value is an Open3D pinhole camera intrinsic object.
- class robokudo.io.cas_view_codecs.ViewPayload¶
-
Persistence envelope for a single CAS view.
- serializer_id: str¶
- payload: typing_extensions.Any¶
- type_name: str¶
- metadata: typing_extensions.Dict[str, typing_extensions.Any]¶
- to_document(view_name: str) typing_extensions.Dict[str, typing_extensions.Any]¶
-
Build a storage document for one encoded view.
- static from_document(document: typing_extensions.Dict[str, typing_extensions.Any]) ViewPayload¶
-
Create a payload object from a storage document.
- class robokudo.io.cas_view_codecs.ViewCodec¶
-
Interface for encoding and decoding CAS view values.
- serializer_id: str¶
- abstract can_encode(value: typing_extensions.Any) bool¶
-
Return
Truewhen this codec can encodevalue.
- abstract encode(value: typing_extensions.Any) ViewPayload¶
-
Encode a runtime value into a
ViewPayload.
- abstract decode(payload: ViewPayload) typing_extensions.Any¶
-
Decode a persisted payload back into a runtime value.
- class robokudo.io.cas_view_codecs.JsonLikeCodec¶
-
Bases:
ViewCodecCodec for plain JSON-like Python values.
- serializer_id: str = 'json_like_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is already JSON-compatible.
- encode(value: typing_extensions.Any) ViewPayload¶
-
Wrap a JSON-like value without further transformation.
- decode(payload: ViewPayload) typing_extensions.Any¶
-
Return the raw payload value.
- class robokudo.io.cas_view_codecs.TupleCodec¶
-
Bases:
ViewCodecCodec for tuple values.
- serializer_id: str = 'tuple_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is a tuple.
- encode(value: tuple[typing_extensions.Any, Ellipsis]) ViewPayload¶
-
Encode a tuple as a list payload.
- decode(payload: ViewPayload) tuple[typing_extensions.Any, Ellipsis]¶
-
Decode list payload data back to a tuple.
- class robokudo.io.cas_view_codecs.BytesCodec¶
-
Bases:
ViewCodecCodec for raw bytes data.
- serializer_id: str = 'bytes_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is
bytesorbytearray.
- encode(value: bytes | bytearray) ViewPayload¶
-
Encode bytes as a base64 text payload.
- decode(payload: ViewPayload) bytes¶
-
Decode base64 text payload into raw bytes.
- class robokudo.io.cas_view_codecs.NumpyCodec¶
-
Bases:
ViewCodecCodec for NumPy arrays.
- serializer_id: str = 'numpy_npy_base64_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is a NumPy array.
- encode(value: numpy.ndarray) ViewPayload¶
-
Encode a NumPy array as base64 encoded
.npydata.
- decode(payload: ViewPayload) numpy.ndarray¶
-
Decode base64 encoded
.npypayload data.
- class robokudo.io.cas_view_codecs.RosMessageCodec¶
-
Bases:
ViewCodecCodec for ROS message instances.
- serializer_id: str = 'ros_message_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value looks like a ROS message.
- encode(value: typing_extensions.Any) ViewPayload¶
-
Encode a ROS message to dictionary payload data.
- decode(payload: ViewPayload) typing_extensions.Any¶
-
Decode dictionary payload data to a ROS message object.
- class robokudo.io.cas_view_codecs.Open3DPointCloudCodec¶
-
Bases:
ViewCodecCodec for Open3D point cloud values.
- serializer_id: str = 'open3d_pointcloud_pcd_base64_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is an Open3D point cloud.
- encode(value: typing_extensions.Any) ViewPayload¶
-
Encode a point cloud as base64-encoded PCD data.
- decode(payload: ViewPayload) typing_extensions.Any¶
-
Decode base64-encoded PCD data to an Open3D point cloud.
- class robokudo.io.cas_view_codecs.Open3DPinholeCameraIntrinsicCodec¶
-
Bases:
ViewCodecCodec for Open3D pinhole camera intrinsic values.
- serializer_id: str = 'open3d_pinhole_camera_intrinsic_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is an Open3D pinhole camera intrinsic.
- encode(value: typing_extensions.Any) ViewPayload¶
-
Encode an Open3D pinhole intrinsic into JSON-compatible payload data.
- decode(payload: ViewPayload) typing_extensions.Any¶
-
Decode payload data to an Open3D pinhole intrinsic object.
- class robokudo.io.cas_view_codecs.StampedTransformCodec¶
-
Bases:
ViewCodecCodec for RoboKudo
StampedTransformvalues.- serializer_id: str = 'robokudo_stamped_transform_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is a
StampedTransform.
- encode(value: robokudo.types.tf.StampedTransform) ViewPayload¶
-
Encode a
StampedTransforminto plain dictionary data.
- decode(payload: ViewPayload) robokudo.types.tf.StampedTransform¶
-
Decode plain dictionary data to a
StampedTransform.
- class robokudo.io.cas_view_codecs.HomogeneousTransformationMatrixCodec¶
-
Bases:
ViewCodecCodec for SemDT homogeneous transformation matrices.
- serializer_id: str = 'semdt_homogeneous_transform_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Check whether the value is a homogeneous transformation matrix.
- encode(value: semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix) ViewPayload¶
-
Encode a transformation matrix with its native JSON representation.
- decode(payload: ViewPayload) semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix¶
-
Decode matrix payload data using the active world entity tracker.
- class robokudo.io.cas_view_codecs.KrroodCodec¶
-
Bases:
ViewCodecCodec that delegates serialization to KRROOD.
- serializer_id: str = 'krrood_json_v1'¶
- can_encode(value: typing_extensions.Any) bool¶
-
Probe whether KRROOD can serialize the given value.
- encode(value: typing_extensions.Any) ViewPayload¶
-
Encode a value using KRROOD JSON serialization.
- decode(payload: ViewPayload) typing_extensions.Any¶
-
Decode a value using KRROOD JSON deserialization.
- class robokudo.io.cas_view_codecs.CASViewCodecRegistry¶
-
Registry that routes CAS views to matching codecs.
- __post_init__() None¶
-
Initialize a default codec order when no codecs are provided.
- _find_encoder(value: typing_extensions.Any) typing_extensions.Optional[ViewCodec]¶
-
Return the first registered codec that can encode
value.
- _find_decoder(serializer_id: str) ViewCodec¶
-
Return the codec registered for a serializer identifier.
- encode_view(view_name: str, value: typing_extensions.Any, strict: bool = True) typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.Any]]¶
-
Encode a single CAS view into a storage document.
- decode_view(document: typing_extensions.Dict[str, typing_extensions.Any]) tuple[str, typing_extensions.Any]¶
-
Decode a single storage document into one CAS view.
- encode_views(views: typing_extensions.Dict[str, typing_extensions.Any], strict: bool = True) typing_extensions.List[typing_extensions.Dict[str, typing_extensions.Any]]¶
-
Encode multiple CAS views into storage documents.
- decode_views(documents: typing_extensions.Iterable[typing_extensions.Dict[str, typing_extensions.Any]]) typing_extensions.Dict[str, typing_extensions.Any]¶
-
Decode storage documents into CAS view values.