robokudo.annotators.core

Core annotator classes for RoboKudo.

This module provides base classes for implementing annotators in RoboKudo.

Classes:
  • BaseAnnotator - Core functionality for CAS access, tree structure, and GUI result handling

  • ThreadedAnnotator - Extends BaseAnnotator for long-running operations that should not block the main thread

Features:
  • Common Analysis Structure (CAS) access

  • Tree structure navigation

  • GUI result handling

  • Thread-safe operation

  • Variable publishing

  • Action client integration

Classes

BaseAnnotator

Base class for all RoboKudo annotators.

Worker

Worker class for executing functions asynchronously.

ThreadedAnnotator

Base class for annotators that perform long-running operations.

ActionClientAnnotator

Base class for annotators that interact with ROS action servers.

Module Contents

class robokudo.annotators.core.BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME)

Bases: py_trees.behaviour.Behaviour

Base class for all RoboKudo annotators.

This class provides core functionality for CAS access, tree structure navigation, and GUI result handling. It serves as the foundation for implementing annotators in the RoboKudo framework.

Parameters:
  • name (str, optional) – Name of the annotator instance, defaults to “Annotator”

  • descriptor (BaseAnnotator.Descriptor, optional) – Configuration descriptor for the annotator, defaults to Descriptor()

  • ros_pkg_name (str, optional) – Name of the ROS package, defaults to robokudo.defs.PACKAGE_NAME

class Descriptor

Configuration descriptor for annotator parameters and capabilities.

This class defines the structure for specifying annotator parameters and capabilities. It provides access to global parameters through the Parameters class.

class Parameters

Container for annotator parameters.

This class provides access to both annotator-specific and global parameters. Global parameters are accessed through the global_ prefix.

class Capabilities

Defines the input and output capabilities of an annotator.

Variables:
  • inputs – Input types accepted by the annotator

  • outputs – Output types produced by the annotator

inputs = None
outputs = None
parameters
capabilities
rk_logger = None
descriptor
setup(timeout=None, node=None, visitor=None)

Perform delayed initialization.

This method is called for initialization tasks that require ROS to be running, such as setting up publishers/subscribers or drivers.

Parameters:
  • timeout (float) – Maximum time to wait for initialization

  • node (rclpy.node.Node) – A ros node

  • visitor

Returns:

True if setup was successful, False otherwise

Return type:

bool

initialise()

Initialize the annotator state.

Called when first tick is received and anytime status is not running.

update()

Update the annotator state.

Called every time the behavior is ticked.

Returns:

Status of the behavior after update

Return type:

py_trees.common.Status

terminate(new_status)

Handle annotator termination.

Called whenever behavior switches to a non-RUNNING state.

Parameters:

new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)

key_callback(key)

Handle keyboard input events.

Parameters:

key (int) – Key code of the pressed key

mouse_callback(event, x, y, flags, param)

Handle mouse interaction events in the visualizer window.

Parameters:
  • event (int) – The event type (see cv2.EVENT_* constants)

  • x (int) – The x coordinate of the mouse click

  • y (int) – The y coordinate of the mouse click

  • flags (int) – Flags passed by OpenCV

  • param (Any) – Parameters passed by OpenCV

get_cas() robokudo.cas.CAS | None

Get the CAS from the parent RoboKudo pipeline.

Returns:

The Common Analysis Structure (CAS) instance or None if not found

Return type:

Optional[robokudo.cas.CAS]

get_parent_pipeline() robokudo.pipeline.Pipeline | None

Get the pipeline containing this annotator.

Returns:

The parent pipeline instance or None if not found

Return type:

Optional[robokudo.pipeline.Pipeline]

get_annotator_outputs() robokudo.annotators.outputs.AnnotatorOutputs

Get the outputs container for all annotators in the current pipeline.

Returns:

Container for all annotator outputs in the pipeline

Return type:

robokudo.annotators.outputs.AnnotatorOutputs

Raises:

AssertionError – If output container types are invalid

get_annotator_output_struct() robokudo.annotators.outputs.AnnotatorOutputStruct

Get the output structure for this specific annotator.

Dynamically adds the annotator to the output structure if not already present.

Returns:

Output structure for this annotator

Return type:

robokudo.annotators.outputs.AnnotatorOutputStruct

add_self_to_annotator_output_struct()

Add this annotator to the output structure.

Called when the annotator is not yet present in the output structure.

get_class_name() str

Get the class name of this annotator.

Unlike self.name which can be customized, this always returns the actual class name.

Returns:

The class name of this annotator

Return type:

str

get_data_from_analysis_scope(analysis_scope: list)

Look up data to analyze based on a list of Types or String constants.

Retrieves data from the CAS based on specified types or CASView constants. Can be used to analyze data at different levels (Scene, ObjectHypothesis, etc.) based on the current perception task context.

Example:

get_data_from_analysis_scope([CASViews.COLOR_IMAGE])
=>
{
    CASViews.COLOR_IMAGE : numpy.array([.. image data...])
}

get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis])
=>
{
    robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN]
}
Parameters:

analysis_scope (list) – List of data types to target (CASView attributes or Annotation subclasses)

Returns:

Dict mapping requested types to corresponding data

Return type:

dict

Raises:

ValueError – If result would be empty

init_time_recording()

Initialize the time recording dictionary for performance measurement.

set_time_recording(value, func='update')

Record timing information for a function.

Parameters:
  • value (float) – The timing value to record

  • func (str, optional) – The function name to associate with the timing, defaults to “update”

shall_publish_variables() bool

Check if variables should be published based on descriptor settings.

Returns:

True if variables should be published, False otherwise

Return type:

bool

setup_published_variables() None

Setup the ROS Publisher and internal data structure if data shall be published

Returns:

None

update_published_variable(name: str, value: float) None

Store a value that shall be published whenever self.publish_variables() is called.

Parameters:
  • name (str) – Identifier of the variable

  • value (float) – The actual value (must be a float)

setup_done_for_published_variables() bool

Check if variable publishing setup is complete.

Returns:

True if setup is complete, False otherwise

Return type:

bool

publish_variables() None

Publish all collected variable updates at once.

Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning.

This method will only work if you’ve set self.publish_variables = True in the Descriptor of your Annotator

Returns:

None

class robokudo.annotators.core.Worker(fn, args=())

Bases: object

Worker class for executing functions asynchronously.

This class provides a way to run functions in a separate thread and optionally execute a callback when the function completes.

future
_fn
_args = ()
start(callback=None)

Start executing the function in a separate thread.

Parameters:

callback (callable, optional) – Function to call when execution completes, defaults to None

run()

Execute the function and store its result in the future object.

class robokudo.annotators.core.ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())

Bases: BaseAnnotator

Base class for annotators that perform long-running operations.

This class extends BaseAnnotator to support operations that should not block the main thread. It provides functionality to run computations asynchronously and handle their results.

compute_worker = None
compute_worker_started = False
compute_worker_thread = None
initialise()

Initialize the annotator state.

Called when first tick is received and anytime status is not running.

update()

Update the annotator state.

Manages the asynchronous computation and returns appropriate status: - RUNNING if computation is ongoing - SUCCESS/FAILURE based on computation result - INVALID if computation failed with an exception

Returns:

Status of the behavior after update

Return type:

py_trees.common.Status

compute()

Perform the main computation of the annotator.

This method should be overridden by subclasses to implement the actual computation logic.

terminate(new_status)

Handle annotator termination.

Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID

Parameters:

new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)

class robokudo.annotators.core.ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor())

Bases: BaseAnnotator

Base class for annotators that interact with ROS action servers.

This class provides functionality to work with ROS action servers, including connection management, goal sending, and result/feedback handling.

ros_pkg_name = 'robokudo'
class Descriptor

Bases: BaseAnnotator

Configuration descriptor for action client annotators.

parameters
_action_client = None
_last_goal = None
_last_result = None
_last_feedback = None
_result_is_novel = False
_waiting_for_result = False
setup(timeout=None, node=None, visitor=None)

Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc.

Parameters:
  • timeout (float) – Maximum time to wait for setup completion

  • node (rclpy.node.Node) – A ros node

  • visitor

Returns:

True if setup was successful, False otherwise

Return type:

bool

update() py_trees.common.Status

Receive data from action server and send new goal. Called every time the behavior is ticked.

Returns:

Status of the behavior after update

Return type:

py_trees.common.Status

abstract _create_goal_msg() Any

Create a goal to send to the action server. Called in every update method call.

Returns:

The goal message to send

Return type:

Any

_feedback_callback(feedback_msg)

Receive feedback from the action server. Called when feedback from the action server is received.

Parameters:

feedback_msg (Any) – Feedback message from the action server

abstract _process_result(goal, result)

Process the result received from the action server. Called when a result was received.

Parameters:
  • goal (Any) – The original goal that was sent

  • result (Any) – The result received from the action server

Returns:

True for success, False for failure, or a py_trees.common.Status

Return type:

Union[bool, py_trees.common.Status]

_get_result_callback(status, result)

Get result from action server. Callback for receiving results.

Parameters:
  • status (actionlib.GoalStatus) – The terminal status of the goal

  • result (Any) – The result from the action server

_connect_to_server(timeout: int = 3)

Connect to the action server. Called in setup method call.

Parameters:

timeout (int, optional) – Maximum time to wait for connection, defaults to 3

Returns:

True if connection successful, False otherwise

Return type:

bool

_start_server()

Start the action server if configured to do so.