robokudo.io.storage

MongoDB storage interface for RoboKudo.

This module provides the core interface between RoboKudo and MongoDB for storing and retrieving sensor data and annotations. It supports:

  • Serialization of complex data types

  • Conversion between ROS and MongoDB formats

  • Efficient binary storage of numpy arrays

  • Configurable view storage and restoration

  • Thread-safe database operations

The module handles:

  • RGB-D camera data

  • Camera calibration information

  • ROS messages and transforms

  • Open3D data structures

  • Custom RoboKudo types

Classes

Storage

Main interface between RoboKudo and MongoDB.

Functions

recursive_convert(value)

Recursively converts non-BSON types into Python-native types.

ros_message_to_dict(msg)

Recursively converts a ROS2 message into a dictionary using introspection.

dict_to_ros_message(message_type, data_dict)

Recursively converts a dictionary into a ROS2 message of the specified type.

Module Contents

robokudo.io.storage.recursive_convert(value)

Recursively converts non-BSON types into Python-native types. In particular, converts array.array and numpy.ndarray objects into lists.

robokudo.io.storage.ros_message_to_dict(msg)

Recursively converts a ROS2 message into a dictionary using introspection. Ensures that any array.array or numpy.ndarray objects are converted to lists.

robokudo.io.storage.dict_to_ros_message(message_type, data_dict)

Recursively converts a dictionary into a ROS2 message of the specified type. This example assumes that the dictionary values are basic types.

class robokudo.io.storage.Storage(db_name)

Main interface between RoboKudo and MongoDB.

This class holds the main interface code between RoboKudo and the MongoDB database. It stores sensor data and CAS views by converting specialized types (NumPy arrays, ROS messages, etc.) into a BSON‑encodable dictionary format.

Variables:
  • db_name – Name of the MongoDB database

  • client – MongoDB client connection

  • db – MongoDB database instance

BLACKLISTED_TYPES
is_blacklisted()

Check if an object is of a blacklisted type.

Parameters:

obj (Any) – Object to check

Returns:

True if object type is blacklisted, False otherwise

Return type:

bool

static instantiate_mongo_client()

Create a MongoDB client instance.

Uses environment variables RK_MONGO_HOST and RK_MONGO_PORT if set, otherwise defaults to localhost:27017.

Returns:

MongoDB client instance

Return type:

pymongo.MongoClient

class Reader(db_name)

Deprecated cursor-based MongoDB reader.

Deprecated since version Use: ListReader instead for better pymongo compatibility

db_reader
reset_cursor()

Reset the cursor to the start of the collection.

collection_has_frames()

Check if collection has any frames.

Returns:

True if frames exist, False otherwise

Return type:

bool

cursor_has_frames()

Check if cursor has more frames.

Returns:

True if more frames exist, False otherwise

Return type:

bool

get_next_frame() dict | None

Get the next frame from the cursor.

Returns:

Next frame data or None if no more frames

Return type:

dict or None

class ListReader(db_name)

List-based MongoDB reader.

This class reads all matching records into a list for iteration, providing better compatibility across pymongo versions and simpler cursor management.

Variables:
  • db_reader – MongoDB database instance

  • index – Current position in data list

  • data – List of loaded documents

db_reader
index = None
data = []
reset_cursor()

Reset the reader state.

Clears and reloads all documents from the database.

cursor_has_frames()

Check if more frames are available.

Returns:

True if more frames exist, False otherwise

Return type:

bool

get_next_frame() dict | None

Get the next frame from the data list.

Returns:

Next frame data or None if no more frames

Return type:

dict or None

db_name
client
db
drop_database()

Drop the entire database.

static nd_array_to_numpy_binary(arr: numpy.ndarray) bytes

Convert numpy array to binary format for storage.

Parameters:

arr (numpy.ndarray) – Numpy array to convert

Returns:

Binary representation of array

Return type:

bytes

static numpy_binary_to_nd_array(binary: bytes) numpy.ndarray

Convert binary data back to numpy array.

Parameters:

bin (bytes) – Binary data to convert

Returns:

Reconstructed numpy array

Return type:

numpy.ndarray

static ros_cam_info_to_mongo(cam_info: sensor_msgs.msg.CameraInfo) dict

Convert ROS camera info to MongoDB format.

Convert a ROS2 CameraInfo message into a dictionary using our custom introspection, ensuring that all non-BSON types (numpy.ndarray, array.array) are converted.

Parameters:

cam_info (sensor_msgs.msg.CameraInfo) – ROS camera info message

Returns:

Dictionary representation of camera info

Return type:

dict

static ros_cam_info_from_mongo(mongo_cam_info: dict) sensor_msgs.msg.CameraInfo

Convert a dictionary from MongoDB back into a ROS2 CameraInfo message.

Parameters:

mongo_cam_info (dict) – Dictionary representation of camera info

Returns:

ROS camera info message

Return type:

sensor_msgs.msg.CameraInfo

static camera_intrinsic_to_mongo(camera_intrinsic: open3d.camera.PinholeCameraIntrinsic) Dict[str, int | float]
static camera_intrinsic_from_mongo(camera_intrinsic_dict: Dict[str, int | float]) open3d.camera.PinholeCameraIntrinsic
static rk_stamped_transform_to_mongo(stamped_transform: robokudo.types.tf.StampedTransform) dict

Convert RoboKudo stamped transform to MongoDB format.

Parameters:

stamped_transform (robokudo.types.tf.StampedTransform) – RoboKudo stamped transform

Returns:

Dictionary representation of transform

Return type:

dict

static rk_stamped_transform_from_mongo(stamped_transform_dict: dict) robokudo.types.tf.StampedTransform

Convert MongoDB transform back to RoboKudo format.

Parameters:

stamped_transform_dict (dict) – Dictionary representation of transform

Returns:

RoboKudo stamped transform

Return type:

robokudo.types.tf.StampedTransform

view_configuration
store_views_in_mongo(cas_dict: Dict) None

Store CAS views in MongoDB.

Store the views present in cas.views. Please note that this method will change cas. So make a deepcopy if you want to leave your true CAS untouched!

Parameters:

cas_dict (dict) – CAS as a dictionary to persist

load_views_from_mongo_in_cas(cas_document: Dict) None

Load views from MongoDB into a CAS document.

Retrieves and converts each view referenced in the CAS document from its MongoDB representation back to its original format.

Parameters:

cas_document (dict) – CAS document to update with loaded views

load_annotations_from_mongo_in_cas(cas_document: Dict, cas: robokudo.cas.CAS) None

Load annotations from MongoDB into a CAS.

Restore the annotations from the database and insert them into the CAS. If no (pickled) annotations are available, cas will be untouched.

Parameters:
  • cas_document (dict) – A dict representing a frame of a CAS in the database

  • cas (robokudo.cas.CAS) – The ‘robokudo.cas.CAS’ instance where the annotations shall be inserted to

generate_dict_from_real_cas(cas: robokudo.cas.CAS) Dict

Convert a CAS instance to a MongoDB-compatible dictionary.

Generate dict that we can put in mongo from CAS

Parameters:

cas (robokudo.cas.CAS) – Input CAS that should be used to create a dict-representation of it.

Returns:

A dict with references to parts of the input CAS.

Return type:

dict

store_cas_dict(cas_dict: Dict)

Store a CAS dictionary in MongoDB.

Stores the views and creates a CAS document in MongoDB that references them.

Parameters:

cas_dict (dict) – Dictionary representation of a CAS

Returns:

MongoDB ObjectId of the stored CAS document

Return type:

bson.objectid.ObjectId