robokudo.io.cas_view_codecs =========================== .. py:module:: robokudo.io.cas_view_codecs .. autoapi-nested-parse:: Codec registry and serializers for CAS view persistence. Classes ------- .. autoapisummary:: robokudo.io.cas_view_codecs.ViewPayload robokudo.io.cas_view_codecs.ViewCodec robokudo.io.cas_view_codecs.JsonLikeCodec robokudo.io.cas_view_codecs.TupleCodec robokudo.io.cas_view_codecs.BytesCodec robokudo.io.cas_view_codecs.NumpyCodec robokudo.io.cas_view_codecs.RosMessageCodec robokudo.io.cas_view_codecs.Open3DPointCloudCodec robokudo.io.cas_view_codecs.Open3DPinholeCameraIntrinsicCodec robokudo.io.cas_view_codecs.StampedTransformCodec robokudo.io.cas_view_codecs.HomogeneousTransformationMatrixCodec robokudo.io.cas_view_codecs.KrroodCodec robokudo.io.cas_view_codecs.CASViewCodecRegistry Functions --------- .. autoapisummary:: robokudo.io.cas_view_codecs._recursive_convert robokudo.io.cas_view_codecs._ros_message_to_dict robokudo.io.cas_view_codecs._dict_to_ros_message robokudo.io.cas_view_codecs._is_json_primitive robokudo.io.cas_view_codecs._is_json_like robokudo.io.cas_view_codecs._full_type_name robokudo.io.cas_view_codecs._load_type robokudo.io.cas_view_codecs._is_open3d_pinhole_camera_intrinsic Module Contents --------------- .. py:function:: _recursive_convert(value: typing_extensions.Any) -> typing_extensions.Any Recursively convert non-JSON-friendly array-like values into plain containers. .. py:function:: _ros_message_to_dict(msg: typing_extensions.Any) -> typing_extensions.Dict[str, typing_extensions.Any] Convert a ROS message object into plain dictionary data. .. py:function:: _dict_to_ros_message(message_type: type[typing_extensions.Any], data_dict: typing_extensions.Dict[str, typing_extensions.Any]) -> typing_extensions.Any Populate a ROS message instance from plain dictionary data. .. py:function:: _is_json_primitive(value: typing_extensions.Any) -> bool Return ``True`` when a value is a JSON primitive. .. py:function:: _is_json_like(value: typing_extensions.Any) -> bool Return ``True`` when a value can be represented as plain JSON data. .. py:function:: _full_type_name(value: typing_extensions.Any) -> str Return the fully qualified type name for a value. .. py:function:: _load_type(type_name: str) -> type[typing_extensions.Any] Load a Python type from its fully qualified name. .. py:function:: _is_open3d_pinhole_camera_intrinsic(value: typing_extensions.Any) -> bool Check whether a value is an Open3D pinhole camera intrinsic object. .. py:class:: ViewPayload Persistence envelope for a single CAS view. .. py:attribute:: serializer_id :type: str .. py:attribute:: payload :type: typing_extensions.Any .. py:attribute:: type_name :type: str .. py:attribute:: metadata :type: typing_extensions.Dict[str, typing_extensions.Any] .. py:method:: to_document(view_name: str) -> typing_extensions.Dict[str, typing_extensions.Any] Build a storage document for one encoded view. .. py:method:: from_document(document: typing_extensions.Dict[str, typing_extensions.Any]) -> ViewPayload :staticmethod: Create a payload object from a storage document. .. py:class:: ViewCodec Interface for encoding and decoding CAS view values. .. py:attribute:: serializer_id :type: str .. py:method:: can_encode(value: typing_extensions.Any) -> bool :abstractmethod: Return ``True`` when this codec can encode ``value``. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload :abstractmethod: Encode a runtime value into a :class:`ViewPayload`. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any :abstractmethod: Decode a persisted payload back into a runtime value. .. py:class:: JsonLikeCodec Bases: :py:obj:`ViewCodec` Codec for plain JSON-like Python values. .. py:attribute:: serializer_id :type: str :value: 'json_like_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is already JSON-compatible. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload Wrap a JSON-like value without further transformation. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any Return the raw payload value. .. py:class:: TupleCodec Bases: :py:obj:`ViewCodec` Codec for tuple values. .. py:attribute:: serializer_id :type: str :value: 'tuple_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is a tuple. .. py:method:: encode(value: tuple[typing_extensions.Any, Ellipsis]) -> ViewPayload Encode a tuple as a list payload. .. py:method:: decode(payload: ViewPayload) -> tuple[typing_extensions.Any, Ellipsis] Decode list payload data back to a tuple. .. py:class:: BytesCodec Bases: :py:obj:`ViewCodec` Codec for raw bytes data. .. py:attribute:: serializer_id :type: str :value: 'bytes_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is ``bytes`` or ``bytearray``. .. py:method:: encode(value: bytes | bytearray) -> ViewPayload Encode bytes as a base64 text payload. .. py:method:: decode(payload: ViewPayload) -> bytes Decode base64 text payload into raw bytes. .. py:class:: NumpyCodec Bases: :py:obj:`ViewCodec` Codec for NumPy arrays. .. py:attribute:: serializer_id :type: str :value: 'numpy_npy_base64_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is a NumPy array. .. py:method:: encode(value: numpy.ndarray) -> ViewPayload Encode a NumPy array as base64 encoded ``.npy`` data. .. py:method:: decode(payload: ViewPayload) -> numpy.ndarray Decode base64 encoded ``.npy`` payload data. .. py:class:: RosMessageCodec Bases: :py:obj:`ViewCodec` Codec for ROS message instances. .. py:attribute:: serializer_id :type: str :value: 'ros_message_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value looks like a ROS message. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload Encode a ROS message to dictionary payload data. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any Decode dictionary payload data to a ROS message object. .. py:class:: Open3DPointCloudCodec Bases: :py:obj:`ViewCodec` Codec for Open3D point cloud values. .. py:attribute:: serializer_id :type: str :value: 'open3d_pointcloud_pcd_base64_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is an Open3D point cloud. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload Encode a point cloud as base64-encoded PCD data. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any Decode base64-encoded PCD data to an Open3D point cloud. .. py:class:: Open3DPinholeCameraIntrinsicCodec Bases: :py:obj:`ViewCodec` Codec for Open3D pinhole camera intrinsic values. .. py:attribute:: serializer_id :type: str :value: 'open3d_pinhole_camera_intrinsic_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is an Open3D pinhole camera intrinsic. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload Encode an Open3D pinhole intrinsic into JSON-compatible payload data. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any Decode payload data to an Open3D pinhole intrinsic object. .. py:class:: StampedTransformCodec Bases: :py:obj:`ViewCodec` Codec for RoboKudo ``StampedTransform`` values. .. py:attribute:: serializer_id :type: str :value: 'robokudo_stamped_transform_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is a ``StampedTransform``. .. py:method:: encode(value: robokudo.types.tf.StampedTransform) -> ViewPayload Encode a ``StampedTransform`` into plain dictionary data. .. py:method:: decode(payload: ViewPayload) -> robokudo.types.tf.StampedTransform Decode plain dictionary data to a ``StampedTransform``. .. py:class:: HomogeneousTransformationMatrixCodec Bases: :py:obj:`ViewCodec` Codec for SemDT homogeneous transformation matrices. .. py:attribute:: serializer_id :type: str :value: 'semdt_homogeneous_transform_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Check whether the value is a homogeneous transformation matrix. .. py:method:: encode(value: semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix) -> ViewPayload Encode a transformation matrix with its native JSON representation. .. py:method:: decode(payload: ViewPayload) -> semantic_digital_twin.spatial_types.HomogeneousTransformationMatrix Decode matrix payload data using the active world entity tracker. .. py:class:: KrroodCodec Bases: :py:obj:`ViewCodec` Codec that delegates serialization to KRROOD. .. py:attribute:: serializer_id :type: str :value: 'krrood_json_v1' .. py:method:: can_encode(value: typing_extensions.Any) -> bool Probe whether KRROOD can serialize the given value. .. py:method:: encode(value: typing_extensions.Any) -> ViewPayload Encode a value using KRROOD JSON serialization. .. py:method:: decode(payload: ViewPayload) -> typing_extensions.Any Decode a value using KRROOD JSON deserialization. .. py:class:: CASViewCodecRegistry Registry that routes CAS views to matching codecs. .. py:attribute:: codecs :type: typing_extensions.List[ViewCodec] :value: [] .. py:method:: __post_init__() -> None Initialize a default codec order when no codecs are provided. .. py:method:: _find_encoder(value: typing_extensions.Any) -> typing_extensions.Optional[ViewCodec] Return the first registered codec that can encode ``value``. .. py:method:: _find_decoder(serializer_id: str) -> ViewCodec Return the codec registered for a serializer identifier. .. py:method:: encode_view(view_name: str, value: typing_extensions.Any, strict: bool = True) -> typing_extensions.Optional[typing_extensions.Dict[str, typing_extensions.Any]] Encode a single CAS view into a storage document. .. py:method:: decode_view(document: typing_extensions.Dict[str, typing_extensions.Any]) -> tuple[str, typing_extensions.Any] Decode a single storage document into one CAS view. .. py:method:: encode_views(views: typing_extensions.Dict[str, typing_extensions.Any], strict: bool = True) -> typing_extensions.List[typing_extensions.Dict[str, typing_extensions.Any]] Encode multiple CAS views into storage documents. .. py:method:: decode_views(documents: typing_extensions.Iterable[typing_extensions.Dict[str, typing_extensions.Any]]) -> typing_extensions.Dict[str, typing_extensions.Any] Decode storage documents into CAS view values.