robokudo.annotators.collection_reader

Sensor data collection and CAS initialization.

This module provides an annotator for:

  • Reading sensor data from multiple interfaces

  • Initializing the CAS with sensor data

  • Managing multiple collection readers

  • Handling data synchronization

The module uses:

  • Camera interface abstraction

  • Pipeline CAS management

  • Data availability monitoring

  • Query preservation

Note

This is typically the first annotator in a pipeline.

Classes

CollectionReaderAnnotator

Sensor data collection and CAS initialization.

Module Contents

class robokudo.annotators.collection_reader.CollectionReaderAnnotator(descriptor: CollectionReaderAnnotator, name: str = 'CollectionReader')

Bases: robokudo.annotators.core.BaseAnnotator

Sensor data collection and CAS initialization.

This annotator:

  • Reads sensor data from camera interfaces

  • Initializes new CAS instances

  • Manages multiple collection readers

  • Monitors data availability

  • Preserves existing queries

Note

Uses separate camera interfaces for different sensor types.

class Descriptor(camera_config: robokudo.descriptors.camera_configs.base_camera_config.BaseCameraConfig, camera_interface: robokudo.io.camera_interface.CameraInterface)

Bases: robokudo.annotators.core.BaseAnnotator.Descriptor

Configuration descriptor for collection reader.

parameters
collection_readers: typing_extensions.List[CollectionReaderAnnotator]
start_timer: typing_extensions.Optional[float] = None
iterations_since_last_data: int = 0
iterations_since_last_data_warn_threshold: int = 40
initialise() None

Initialize the collection reader.

Called when:

  • First tick is received

  • Status changes from non-running

Clears feedback messages for all children.

add_collection_reader(descriptor: CollectionReaderAnnotator) None

Add another collection reader descriptor.

Parameters:

descriptor – Configuration descriptor for additional reader

update() py_trees.common.Status

Process sensor data and update CAS.

The method:

  • Checks for new data from all readers

  • Creates new CAS when data available

  • Preserves existing queries

  • Updates feedback messages

  • Monitors data availability

Returns:

SUCCESS if data processed, RUNNING if waiting

terminate(new_status: py_trees.common.Status) None

Handle behavior termination.

Parameters:

new_status – New status (SUCCESS, FAILURE or INVALID)