robokudo.descriptors

Submodules

Classes

CollectionReaderAnnotator

Sensor data collection and CAS initialization.

CameraConfigRegistry

Registry for camera config classes.

KinectCameraInterface

Interface for Kinect-style RGB-D cameras using ROS.

StorageReaderInterface

A camera interface for reading data from MongoDB storage.

OpenCVCameraWithoutDepthInterface

An openCV camera without depth information.

RGBDFileReaderInterface

Specialized file reader interface for RGB-D camera data.

CrDescriptorFactory

Factory class for creating CollectionReader descriptors.

Package Contents

class robokudo.descriptors.CollectionReaderAnnotator(descriptor: CollectionReaderAnnotator, name: str = 'CollectionReader')

Bases: robokudo.annotators.core.BaseAnnotator

Sensor data collection and CAS initialization.

This annotator:

  • Reads sensor data from camera interfaces

  • Initializes new CAS instances

  • Manages multiple collection readers

  • Monitors data availability

  • Preserves existing queries

Note

Uses separate camera interfaces for different sensor types.

class Descriptor(camera_config: robokudo.descriptors.camera_configs.base_camera_config.BaseCameraConfig, camera_interface: robokudo.io.camera_interface.CameraInterface)

Bases: robokudo.annotators.core.BaseAnnotator.Descriptor

Configuration descriptor for collection reader.

parameters
collection_readers: typing_extensions.List[CollectionReaderAnnotator]
start_timer: typing_extensions.Optional[float] = None
iterations_since_last_data: int = 0
iterations_since_last_data_warn_threshold: int = 40
initialise() None

Initialize the collection reader.

Called when:

  • First tick is received

  • Status changes from non-running

Clears feedback messages for all children.

add_collection_reader(descriptor: CollectionReaderAnnotator) None

Add another collection reader descriptor.

Parameters:

descriptor – Configuration descriptor for additional reader

update() py_trees.common.Status

Process sensor data and update CAS.

The method:

  • Checks for new data from all readers

  • Creates new CAS when data available

  • Preserves existing queries

  • Updates feedback messages

  • Monitors data availability

Returns:

SUCCESS if data processed, RUNNING if waiting

terminate(new_status: py_trees.common.Status) None

Handle behavior termination.

Parameters:

new_status – New status (SUCCESS, FAILURE or INVALID)

class robokudo.descriptors.CameraConfigRegistry

Registry for camera config classes.

_registry: typing_extensions.Dict[str, typing_extensions.Type[robokudo.descriptors.camera_configs.base_camera_config.BaseCameraConfig]]

Map of camera config registry name to their class.

classmethod register_all(package_name: str = 'robokudo.descriptors.camera_configs') None

Register all camera configs of the given package to the camera config registry.

Parameters:

package_name – The name of the package to register camera configs from.

Raises:
  • ImportError – If the given package cannot be imported.

  • ValueError – If there is a duplicate registry name with conflicting classes.

classmethod create_config(config_type: str, **kwargs: typing_extensions.Any) typing_extensions.Any

Create a camera config instance based on the given config type.

Parameters:

config_type – The type of the camera config to create.

Raises:
  • ValueError – If the given config type is not registered.

  • TypeError – If the keyword arguments are invalid for the camera config class.

class robokudo.descriptors.KinectCameraInterface(camera_config: typing_extensions.Any)

Bases: ROSCameraInterface

Interface for Kinect-style RGB-D cameras using ROS.

This class implements a camera interface for RGB-D cameras that publish color and depth images through ROS topics. It supports both raw and compressed image formats.

color_subscriber: message_filters.Subscriber

Color image subscriber

depth_subscriber: message_filters.Subscriber

Depth image subscriber

cam_info_subscriber: message_filters.Subscriber

Camera info subscriber

color: typing_extensions.Optional[numpy.typing.NDArray] = None

Latest color image

depth: typing_extensions.Optional[numpy.typing.NDArray] = None

Latest depth image

cam_info: typing_extensions.Optional[sensor_msgs.msg.CameraInfo] = None

Latest camera info message

cam_intrinsic: typing_extensions.Optional[open3d.camera.PinholeCameraIntrinsic] = None

Open3D camera intrinsics

color2depth_ratio: typing_extensions.Optional[typing_extensions.Tuple[float, float]] = None

Ratio between color and depth image sizes

timestamp: typing_extensions.Optional[float] = None

Latest message timestamp

lock: threading.Lock

Thread synchronization lock

compressed_depth_configured() bool

Check if compressed depth images are configured.

Returns:

True if compressed depth is configured, False otherwise

compressed_color_configured() bool

Check if compressed color images are configured.

Returns:

True if compressed color is configured, False otherwise

get_node() rclpy.node.Node
blackhole_callback(data: typing_extensions.Any) None

This callback is just a dummy to receive data coming from a workaround subscription to handle problems with the ApproximateTimeSynchronizer

Parameters:

data – Dummy data

callback(color_data: typing_extensions.Union[sensor_msgs.msg.Image, sensor_msgs.msg.CompressedImage], depth_data: typing_extensions.Optional[typing_extensions.Union[sensor_msgs.msg.Image, sensor_msgs.msg.CompressedImage]] = None, cam_info: typing_extensions.Optional[sensor_msgs.msg.CameraInfo] = None) None

Process synchronized camera data.

This callback handles incoming color, depth, and camera info messages. It converts the data to OpenCV format and stores it for later use.

TODO make this generic. handle the encoding and order properly. For standard and compressed images. this might also depend on the fix of image_transport_plugins being published as a package. Startpoint can be found at the bottom of this method.

Parameters:
  • color_data – Color image message

  • depth_data – Depth image message

  • cam_info – Camera calibration message

set_data(cas: robokudo.cas.CAS) None

This method is supposed to read in, convert (if needed) and put the data into the CAS. If you are running a CameraInterface which is getting data via callback methods, please make sure to keep callbacks light and do the main conversion work here! Callbacks should be short.

Parameters:

cas – The CAS where the data should be placed in

class robokudo.descriptors.StorageReaderInterface(camera_config: typing_extensions.Any)

Bases: robokudo.io.camera_interface.CameraInterface

A camera interface for reading data from MongoDB storage.

This interface reads sensor data and annotations that were previously stored using the StorageWriter annotator. It handles data deserialization and restoration of the Common Analysis Structure (CAS) views.

storage: robokudo.io.storage.Storage

MongoDB storage interface

reader: robokudo.io.storage.Storage.ListReader

List-based reader for MongoDB data

has_new_data() bool

Check if more data is available to read.

Handles looping behavior based on camera configuration and maintains cursor position in the data sequence.

Returns:

True if more data is available, False otherwise

set_data(cas: robokudo.cas.CAS) None

Update the Common Analysis Structure with data from storage.

This method: * Retrieves the next frame from storage * Restores views and annotations * Updates camera intrinsics * Sets depth availability flag

Parameters:

cas – Common Analysis Structure to update

class robokudo.descriptors.OpenCVCameraWithoutDepthInterface(camera_config: typing_extensions.Any)

Bases: robokudo.io.camera_interface.CameraInterface

An openCV camera without depth information. An OpenCV-based interface for RGB-only cameras and video sources.

This class handles various types of RGB input sources through OpenCV: * Live camera feeds (USB webcams, IP cameras) * Video file playback * Single image or image sequence loading

Supports features like: * Automatic input type detection * Configurable frame looping * Image normalization * Static camera calibration

video_capture: cv2.VideoCapture

OpenCV video capture object

device_driver_flag: int

OpenCV-specific driver flags

stream_type: int = 0

Type of input (video/camera/image)

_loop_counter: int

Number of remaining playback loops. Only works on video/image files.

_backup_color: typing_extensions.Optional[numpy.typing.NDArray] = None

Backup of image for single-image file mode

has_new_data() bool

Check if new frame data is available.

For video/camera streams, attempts to grab the next frame. For single images, checks if backup image is available.

Returns:

True if new data is available, False otherwise

set_data(cas: robokudo.cas.CAS) None

Update the Common Analysis Structure with latest frame data.

This method: * Retrieves the next frame from video/camera or backup * Handles looping behavior for videos and images * Applies optional contrast/brightness normalization * Updates the CAS with frame and camera data

Parameters:

cas – Common Analysis Structure to update

class robokudo.descriptors.RGBDFileReaderInterface(camera_config: typing_extensions.Any)

Bases: FileReaderInterface

Specialized file reader interface for RGB-D camera data.

This class extends FileReaderInterface to handle the specific case of reading RGB-D camera data, including color images, depth images, and camera calibration information.

Inherits all instance variables from FileReaderInterface.

set_data(cas: robokudo.cas.CAS) None

Set the next RGB-D data frame into the CAS.

This method: * Reads the next color and depth images * Applies any necessary fixes (e.g., Kinect height fix) * Sets camera calibration and transformation data * Updates the CAS with all loaded data

Parameters:

cas – Common Analysis Structure to update

class robokudo.descriptors.CrDescriptorFactory

Factory class for creating CollectionReader descriptors.

_camera_interface_types

Mapping of camera names to their corresponding camera interface types.

static create_descriptor(camera: str, **kwargs: typing_extensions.Any) robokudo.annotators.collection_reader.CollectionReaderAnnotator.Descriptor

Create a CollectionReader descriptor for the specified camera.

Parameters:
  • camera – The name of the camera to create a descriptor for.

  • kwargs – Additional keyword arguments to pass to the camera configuration.

Returns:

A CollectionReader descriptor for the specified camera.

Raises:
  • ValueError – If the given camera config name is not registered in the camera config registry.

  • TypeError – If the keyword arguments are invalid for the camera config class.