robokudo.descriptors¶
Submodules¶
Classes¶
Sensor data collection and CAS initialization. |
|
Registry for camera config classes. |
|
Interface for Kinect-style RGB-D cameras using ROS. |
|
A camera interface for reading data from MongoDB storage. |
|
An openCV camera without depth information. |
|
Specialized file reader interface for RGB-D camera data. |
|
Factory class for creating CollectionReader descriptors. |
Package Contents¶
- class robokudo.descriptors.CollectionReaderAnnotator(descriptor: CollectionReaderAnnotator, name: str = 'CollectionReader')¶
-
Bases:
robokudo.annotators.core.BaseAnnotatorSensor data collection and CAS initialization.
This annotator:
Reads sensor data from camera interfaces
Initializes new CAS instances
Manages multiple collection readers
Monitors data availability
Preserves existing queries
Note
Uses separate camera interfaces for different sensor types.
- class Descriptor(camera_config: robokudo.descriptors.camera_configs.base_camera_config.BaseCameraConfig, camera_interface: robokudo.io.camera_interface.CameraInterface)¶
-
Bases:
robokudo.annotators.core.BaseAnnotator.DescriptorConfiguration descriptor for collection reader.
- parameters¶
- collection_readers: typing_extensions.List[CollectionReaderAnnotator]¶
- start_timer: typing_extensions.Optional[float] = None¶
- iterations_since_last_data: int = 0¶
- iterations_since_last_data_warn_threshold: int = 40¶
- initialise() None¶
-
Initialize the collection reader.
Called when:
First tick is received
Status changes from non-running
Clears feedback messages for all children.
- add_collection_reader(descriptor: CollectionReaderAnnotator) None¶
-
Add another collection reader descriptor.
- Parameters:
-
descriptor – Configuration descriptor for additional reader
- update() py_trees.common.Status¶
-
Process sensor data and update CAS.
The method:
Checks for new data from all readers
Creates new CAS when data available
Preserves existing queries
Updates feedback messages
Monitors data availability
- Returns:
-
SUCCESS if data processed, RUNNING if waiting
- terminate(new_status: py_trees.common.Status) None¶
-
Handle behavior termination.
- Parameters:
-
new_status – New status (SUCCESS, FAILURE or INVALID)
- class robokudo.descriptors.CameraConfigRegistry¶
-
Registry for camera config classes.
- _registry: typing_extensions.Dict[str, typing_extensions.Type[robokudo.descriptors.camera_configs.base_camera_config.BaseCameraConfig]]¶
-
Map of camera config registry name to their class.
- classmethod register_all(package_name: str = 'robokudo.descriptors.camera_configs') None¶
-
Register all camera configs of the given package to the camera config registry.
- Parameters:
-
package_name – The name of the package to register camera configs from.
- Raises:
-
ImportError – If the given package cannot be imported.
ValueError – If there is a duplicate registry name with conflicting classes.
- classmethod create_config(config_type: str, **kwargs: typing_extensions.Any) typing_extensions.Any¶
-
Create a camera config instance based on the given config type.
- Parameters:
-
config_type – The type of the camera config to create.
- Raises:
-
ValueError – If the given config type is not registered.
TypeError – If the keyword arguments are invalid for the camera config class.
- class robokudo.descriptors.KinectCameraInterface(camera_config: typing_extensions.Any)¶
-
Bases:
ROSCameraInterfaceInterface for Kinect-style RGB-D cameras using ROS.
This class implements a camera interface for RGB-D cameras that publish color and depth images through ROS topics. It supports both raw and compressed image formats.
- color_subscriber: message_filters.Subscriber¶
-
Color image subscriber
- depth_subscriber: message_filters.Subscriber¶
-
Depth image subscriber
- cam_info_subscriber: message_filters.Subscriber¶
-
Camera info subscriber
- color: typing_extensions.Optional[numpy.typing.NDArray] = None¶
-
Latest color image
- depth: typing_extensions.Optional[numpy.typing.NDArray] = None¶
-
Latest depth image
- cam_info: typing_extensions.Optional[sensor_msgs.msg.CameraInfo] = None¶
-
Latest camera info message
- cam_intrinsic: typing_extensions.Optional[open3d.camera.PinholeCameraIntrinsic] = None¶
-
Open3D camera intrinsics
- color2depth_ratio: typing_extensions.Optional[typing_extensions.Tuple[float, float]] = None¶
-
Ratio between color and depth image sizes
- timestamp: typing_extensions.Optional[float] = None¶
-
Latest message timestamp
- lock: threading.Lock¶
-
Thread synchronization lock
- compressed_depth_configured() bool¶
-
Check if compressed depth images are configured.
- Returns:
-
True if compressed depth is configured, False otherwise
- compressed_color_configured() bool¶
-
Check if compressed color images are configured.
- Returns:
-
True if compressed color is configured, False otherwise
- get_node() rclpy.node.Node¶
- blackhole_callback(data: typing_extensions.Any) None¶
-
This callback is just a dummy to receive data coming from a workaround subscription to handle problems with the ApproximateTimeSynchronizer
- Parameters:
-
data – Dummy data
- callback(color_data: typing_extensions.Union[sensor_msgs.msg.Image, sensor_msgs.msg.CompressedImage], depth_data: typing_extensions.Optional[typing_extensions.Union[sensor_msgs.msg.Image, sensor_msgs.msg.CompressedImage]] = None, cam_info: typing_extensions.Optional[sensor_msgs.msg.CameraInfo] = None) None¶
-
Process synchronized camera data.
This callback handles incoming color, depth, and camera info messages. It converts the data to OpenCV format and stores it for later use.
TODO make this generic. handle the encoding and order properly. For standard and compressed images. this might also depend on the fix of image_transport_plugins being published as a package. Startpoint can be found at the bottom of this method.
- Parameters:
-
color_data – Color image message
depth_data – Depth image message
cam_info – Camera calibration message
- set_data(cas: robokudo.cas.CAS) None¶
-
This method is supposed to read in, convert (if needed) and put the data into the CAS. If you are running a CameraInterface which is getting data via callback methods, please make sure to keep callbacks light and do the main conversion work here! Callbacks should be short.
- Parameters:
-
cas – The CAS where the data should be placed in
- class robokudo.descriptors.StorageReaderInterface(camera_config: typing_extensions.Any)¶
-
Bases:
robokudo.io.camera_interface.CameraInterfaceA camera interface for reading data from MongoDB storage.
This interface reads sensor data and annotations that were previously stored using the StorageWriter annotator. It handles data deserialization and restoration of the Common Analysis Structure (CAS) views.
- storage: robokudo.io.storage.Storage¶
-
MongoDB storage interface
- reader: robokudo.io.storage.Storage.ListReader¶
-
List-based reader for MongoDB data
- has_new_data() bool¶
-
Check if more data is available to read.
Handles looping behavior based on camera configuration and maintains cursor position in the data sequence.
- Returns:
-
True if more data is available, False otherwise
- set_data(cas: robokudo.cas.CAS) None¶
-
Update the Common Analysis Structure with data from storage.
This method: * Retrieves the next frame from storage * Restores views and annotations * Updates camera intrinsics * Sets depth availability flag
- Parameters:
-
cas – Common Analysis Structure to update
- class robokudo.descriptors.OpenCVCameraWithoutDepthInterface(camera_config: typing_extensions.Any)¶
-
Bases:
robokudo.io.camera_interface.CameraInterfaceAn openCV camera without depth information. An OpenCV-based interface for RGB-only cameras and video sources.
This class handles various types of RGB input sources through OpenCV: * Live camera feeds (USB webcams, IP cameras) * Video file playback * Single image or image sequence loading
Supports features like: * Automatic input type detection * Configurable frame looping * Image normalization * Static camera calibration
- video_capture: cv2.VideoCapture¶
-
OpenCV video capture object
- device_driver_flag: int¶
-
OpenCV-specific driver flags
- stream_type: int = 0¶
-
Type of input (video/camera/image)
- _loop_counter: int¶
-
Number of remaining playback loops. Only works on video/image files.
- _backup_color: typing_extensions.Optional[numpy.typing.NDArray] = None¶
-
Backup of image for single-image file mode
- has_new_data() bool¶
-
Check if new frame data is available.
For video/camera streams, attempts to grab the next frame. For single images, checks if backup image is available.
- Returns:
-
True if new data is available, False otherwise
- set_data(cas: robokudo.cas.CAS) None¶
-
Update the Common Analysis Structure with latest frame data.
This method: * Retrieves the next frame from video/camera or backup * Handles looping behavior for videos and images * Applies optional contrast/brightness normalization * Updates the CAS with frame and camera data
- Parameters:
-
cas – Common Analysis Structure to update
- class robokudo.descriptors.RGBDFileReaderInterface(camera_config: typing_extensions.Any)¶
-
Bases:
FileReaderInterfaceSpecialized file reader interface for RGB-D camera data.
This class extends FileReaderInterface to handle the specific case of reading RGB-D camera data, including color images, depth images, and camera calibration information.
Inherits all instance variables from FileReaderInterface.
- set_data(cas: robokudo.cas.CAS) None¶
-
Set the next RGB-D data frame into the CAS.
This method: * Reads the next color and depth images * Applies any necessary fixes (e.g., Kinect height fix) * Sets camera calibration and transformation data * Updates the CAS with all loaded data
- Parameters:
-
cas – Common Analysis Structure to update
- class robokudo.descriptors.CrDescriptorFactory¶
-
Factory class for creating CollectionReader descriptors.
- _camera_interface_types¶
-
Mapping of camera names to their corresponding camera interface types.
- static create_descriptor(camera: str, **kwargs: typing_extensions.Any) robokudo.annotators.collection_reader.CollectionReaderAnnotator.Descriptor¶
-
Create a CollectionReader descriptor for the specified camera.
- Parameters:
-
camera – The name of the camera to create a descriptor for.
kwargs – Additional keyword arguments to pass to the camera configuration.
- Returns:
-
A CollectionReader descriptor for the specified camera.
- Raises:
-
ValueError – If the given camera config name is not registered in the camera config registry.
TypeError – If the keyword arguments are invalid for the camera config class.