robokudo.io.storage =================== .. py:module:: robokudo.io.storage .. autoapi-nested-parse:: MongoDB storage interface for RoboKudo. This module provides the core interface between RoboKudo and MongoDB for storing and retrieving sensor data and annotations. It supports: * Serialization of complex data types * Conversion between ROS and MongoDB formats * Efficient binary storage of numpy arrays * Configurable view storage and restoration * Thread-safe database operations The module handles: * RGB-D camera data * Camera calibration information * ROS messages and transforms * Open3D data structures * Custom RoboKudo types Classes ------- .. autoapisummary:: robokudo.io.storage.Storage Functions --------- .. autoapisummary:: robokudo.io.storage.recursive_convert robokudo.io.storage.ros_message_to_dict robokudo.io.storage.dict_to_ros_message Module Contents --------------- .. py:function:: recursive_convert(value) Recursively converts non-BSON types into Python-native types. In particular, converts array.array and numpy.ndarray objects into lists. .. py:function:: ros_message_to_dict(msg) Recursively converts a ROS2 message into a dictionary using introspection. Ensures that any array.array or numpy.ndarray objects are converted to lists. .. py:function:: dict_to_ros_message(message_type, data_dict) Recursively converts a dictionary into a ROS2 message of the specified type. This example assumes that the dictionary values are basic types. .. py:class:: Storage(db_name) Main interface between RoboKudo and MongoDB. This class holds the main interface code between RoboKudo and the MongoDB database. It stores sensor data and CAS views by converting specialized types (NumPy arrays, ROS messages, etc.) into a BSON‑encodable dictionary format. :ivar db_name: Name of the MongoDB database :type db_name: str :ivar client: MongoDB client connection :type client: pymongo.MongoClient :ivar db: MongoDB database instance :type db: pymongo.database.Database .. py:attribute:: BLACKLISTED_TYPES .. py:method:: is_blacklisted() Check if an object is of a blacklisted type. :param obj: Object to check :type obj: Any :return: True if object type is blacklisted, False otherwise :rtype: bool .. py:method:: instantiate_mongo_client() :staticmethod: Create a MongoDB client instance. Uses environment variables RK_MONGO_HOST and RK_MONGO_PORT if set, otherwise defaults to localhost:27017. :return: MongoDB client instance :rtype: pymongo.MongoClient .. py:class:: Reader(db_name) Deprecated cursor-based MongoDB reader. .. deprecated:: Use ListReader instead for better pymongo compatibility .. py:attribute:: db_reader .. py:method:: reset_cursor() Reset the cursor to the start of the collection. .. py:method:: collection_has_frames() Check if collection has any frames. :return: True if frames exist, False otherwise :rtype: bool .. py:method:: cursor_has_frames() Check if cursor has more frames. :return: True if more frames exist, False otherwise :rtype: bool .. py:method:: get_next_frame() -> Optional[dict] Get the next frame from the cursor. :return: Next frame data or None if no more frames :rtype: dict or None .. py:class:: ListReader(db_name) List-based MongoDB reader. This class reads all matching records into a list for iteration, providing better compatibility across pymongo versions and simpler cursor management. :ivar db_reader: MongoDB database instance :type db_reader: pymongo.database.Database :ivar index: Current position in data list :type index: int or None :ivar data: List of loaded documents :type data: list .. py:attribute:: db_reader .. py:attribute:: index :value: None .. py:attribute:: data :value: [] .. py:method:: reset_cursor() Reset the reader state. Clears and reloads all documents from the database. .. py:method:: cursor_has_frames() Check if more frames are available. :return: True if more frames exist, False otherwise :rtype: bool .. py:method:: get_next_frame() -> Optional[dict] Get the next frame from the data list. :return: Next frame data or None if no more frames :rtype: dict or None .. py:attribute:: db_name .. py:attribute:: client .. py:attribute:: db .. py:method:: drop_database() Drop the entire database. .. py:method:: nd_array_to_numpy_binary(arr: numpy.ndarray) -> bytes :staticmethod: Convert numpy array to binary format for storage. :param arr: Numpy array to convert :type arr: numpy.ndarray :return: Binary representation of array :rtype: bytes .. py:method:: numpy_binary_to_nd_array(binary: bytes) -> numpy.ndarray :staticmethod: Convert binary data back to numpy array. :param bin: Binary data to convert :type bin: bytes :return: Reconstructed numpy array :rtype: numpy.ndarray .. py:method:: ros_cam_info_to_mongo(cam_info: sensor_msgs.msg.CameraInfo) -> dict :staticmethod: Convert ROS camera info to MongoDB format. Convert a ROS2 CameraInfo message into a dictionary using our custom introspection, ensuring that all non-BSON types (numpy.ndarray, array.array) are converted. :param cam_info: ROS camera info message :type cam_info: sensor_msgs.msg.CameraInfo :return: Dictionary representation of camera info :rtype: dict .. py:method:: ros_cam_info_from_mongo(mongo_cam_info: dict) -> sensor_msgs.msg.CameraInfo :staticmethod: Convert a dictionary from MongoDB back into a ROS2 CameraInfo message. :param mongo_cam_info: Dictionary representation of camera info :type mongo_cam_info: dict :return: ROS camera info message :rtype: sensor_msgs.msg.CameraInfo .. py:method:: camera_intrinsic_to_mongo(camera_intrinsic: open3d.camera.PinholeCameraIntrinsic) -> Dict[str, Union[int, float]] :staticmethod: .. py:method:: camera_intrinsic_from_mongo(camera_intrinsic_dict: Dict[str, Union[int, float]]) -> open3d.camera.PinholeCameraIntrinsic :staticmethod: .. py:method:: rk_stamped_transform_to_mongo(stamped_transform: robokudo.types.tf.StampedTransform) -> dict :staticmethod: Convert RoboKudo stamped transform to MongoDB format. :param stamped_transform: RoboKudo stamped transform :type stamped_transform: robokudo.types.tf.StampedTransform :return: Dictionary representation of transform :rtype: dict .. py:method:: rk_stamped_transform_from_mongo(stamped_transform_dict: dict) -> robokudo.types.tf.StampedTransform :staticmethod: Convert MongoDB transform back to RoboKudo format. :param stamped_transform_dict: Dictionary representation of transform :type stamped_transform_dict: dict :return: RoboKudo stamped transform :rtype: robokudo.types.tf.StampedTransform .. py:attribute:: view_configuration .. py:method:: store_views_in_mongo(cas_dict: Dict) -> None Store CAS views in MongoDB. Store the views present in cas.views. Please note that this method will change cas. So make a deepcopy if you want to leave your true CAS untouched! :param cas_dict: CAS as a dictionary to persist :type cas_dict: dict .. py:method:: load_views_from_mongo_in_cas(cas_document: Dict) -> None Load views from MongoDB into a CAS document. Retrieves and converts each view referenced in the CAS document from its MongoDB representation back to its original format. :param cas_document: CAS document to update with loaded views :type cas_document: dict .. py:method:: load_annotations_from_mongo_in_cas(cas_document: Dict, cas: robokudo.cas.CAS) -> None Load annotations from MongoDB into a CAS. Restore the annotations from the database and insert them into the CAS. If no (pickled) annotations are available, cas will be untouched. :param cas_document: A dict representing a frame of a CAS in the database :type cas_document: dict :param cas: The 'robokudo.cas.CAS' instance where the annotations shall be inserted to :type cas: robokudo.cas.CAS .. py:method:: generate_dict_from_real_cas(cas: robokudo.cas.CAS) -> Dict Convert a CAS instance to a MongoDB-compatible dictionary. Generate dict that we can put in mongo from CAS :param cas: Input CAS that should be used to create a dict-representation of it. :type cas: robokudo.cas.CAS :return: A dict with references to parts of the input CAS. :rtype: dict .. py:method:: store_cas_dict(cas_dict: Dict) Store a CAS dictionary in MongoDB. Stores the views and creates a CAS document in MongoDB that references them. :param cas_dict: Dictionary representation of a CAS :type cas_dict: dict :return: MongoDB ObjectId of the stored CAS document :rtype: bson.objectid.ObjectId