robokudo.pipeline

Pipeline implementation for RoboKudo.

This module provides the Pipeline class, which is a specialized version of py_trees’ Sequence behavior. A Pipeline manages its own Common Analysis Structure (CAS) and provides visualization capabilities for its annotators.

Key features:

  • Automatic CAS management for each pipeline iteration

  • Visualization support through annotator outputs

  • Helper methods for accessing annotators and CAS

  • Nested pipeline support with proper scoping

Classes

Pipeline

A RoboKudo pipeline composed of annotators with its own CAS.

Module Contents

class robokudo.pipeline.Pipeline(name='Sequence', children=None, *args, **kwargs)

Bases: py_trees.composites.Sequence

A RoboKudo pipeline composed of annotators with its own CAS.

A Pipeline is a specialized sequence behavior that manages a Common Analysis Structure (CAS) for its annotators. It provides visualization capabilities and helper methods for accessing annotators and CAS data.

Variables:
  • cas – The Common Analysis Structure for this pipeline

  • cas_start_timer – Timer for measuring pipeline execution time

  • rk_logger – Logger instance for this pipeline

cas = None
cas_start_timer = None
rk_logger = None
create_new_cas()

Create a new Common Analysis Structure (CAS) for this pipeline. Also resets the execution timer.

setup(timeout: float = None, node: rclpy.node.Node = None, visitor=None)

Set up the pipeline and its annotator outputs.

Creates output structures for all annotators in the pipeline and marks them for initial rendering.

Parameters:
  • timeout (float) – Setup timeout in seconds

  • node (rclpy.node.Node) – a ros node

  • visitor

Returns:

True if setup was successful

Return type:

bool

get_cas()

Get the pipeline’s Common Analysis Structure.

Returns:

The pipeline’s CAS

Return type:

robokudo.cas.CAS

get_annotators()

Get all annotators in this pipeline.

Returns a list of all annotators in this pipeline, including those nested in other behaviors (except other pipelines).

Returns:

List of annotators

Return type:

list[robokudo.annotators.core.BaseAnnotator]

pipeline_children()

Get all children of this pipeline.

Returns a list of all children in this pipeline, stopping at nested Pipeline objects. The order is given by utils.tree.behavior_iterate_except_type.

Returns:

List of child behaviors

Return type:

list[py_trees.behaviour.Behaviour]

update()

Update the pipeline’s state.

Handles pipeline execution status and triggers GUI updates on failure.

Returns:

Execution status

Return type:

py_trees.common.Status

terminate(new_status)

Handle pipeline termination.

Called when the pipeline switches to a non-RUNNING state. Updates GUI on success and logs execution time statistics.

Parameters:

new_status (py_trees.common.Status) – New execution status