robokudo.annotators.core¶
Core annotator classes for RoboKudo.
This module provides base classes for implementing annotators in RoboKudo.
- Classes:
-
BaseAnnotator - Core functionality for CAS access, tree structure, and GUI result handling
ThreadedAnnotator - Extends BaseAnnotator for long-running operations that should not block the main thread
- Features:
-
Common Analysis Structure (CAS) access
Tree structure navigation
GUI result handling
Thread-safe operation
Variable publishing
Action client integration
Classes¶
Base class for all RoboKudo annotators. |
|
Worker class for executing functions asynchronously. |
|
Base class for annotators that perform long-running operations. |
Module Contents¶
- class robokudo.annotators.core.BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME)¶
-
Bases:
py_trees.behaviour.BehaviourBase class for all RoboKudo annotators.
This class provides core functionality for CAS access, tree structure navigation, and GUI result handling. It serves as the foundation for implementing annotators in the RoboKudo framework.
- Parameters:
-
name (str, optional) – Name of the annotator instance, defaults to “Annotator”
descriptor (BaseAnnotator.Descriptor, optional) – Configuration descriptor for the annotator, defaults to Descriptor()
ros_pkg_name (str, optional) – Name of the ROS package, defaults to robokudo.defs.PACKAGE_NAME
- class Descriptor¶
-
Configuration descriptor for annotator parameters and capabilities.
This class defines the structure for specifying annotator parameters and capabilities. It provides access to global parameters through the Parameters class.
- class Parameters¶
-
Container for annotator parameters.
This class provides access to both annotator-specific and global parameters. Global parameters are accessed through the global_ prefix.
- class Capabilities¶
-
Defines the input and output capabilities of an annotator.
- Variables:
-
inputs – Input types accepted by the annotator
outputs – Output types produced by the annotator
- inputs = None¶
- outputs = None¶
- parameters¶
- capabilities¶
- rk_logger = None¶
- descriptor¶
- setup(**kwargs: Any)¶
-
Perform delayed initialization.
This method is called for initialization tasks after the constructor has been called. It can be used for setup calls that require ROS to be running, such as setting up publishers/subscribers or drivers.
- initialise()¶
-
Initialize the annotator state.
Called when first tick is received and anytime status is not running.
- update()¶
-
Update the annotator state.
Called every time the behavior is ticked.
- Returns:
-
Status of the behavior after update
- Return type:
-
py_trees.common.Status
- terminate(new_status)¶
-
Handle annotator termination.
Called whenever behavior switches to a non-RUNNING state.
- Parameters:
-
new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)
- key_callback(key)¶
-
Handle keyboard input events.
- Parameters:
-
key (int) – Key code of the pressed key
- mouse_callback(event, x, y, flags, param)¶
-
Handle mouse interaction events in the visualizer window.
- Parameters:
-
event (int) – The event type (see cv2.EVENT_* constants)
x (int) – The x coordinate of the mouse click
y (int) – The y coordinate of the mouse click
flags (int) – Flags passed by OpenCV
param (Any) – Parameters passed by OpenCV
- get_cas() robokudo.cas.CAS | None¶
-
Get the CAS from the parent RoboKudo pipeline.
- Returns:
-
The Common Analysis Structure (CAS) instance or None if not found
- Return type:
-
Optional[robokudo.cas.CAS]
- get_parent_pipeline() robokudo.pipeline.Pipeline | None¶
-
Get the pipeline containing this annotator.
- Returns:
-
The parent pipeline instance or None if not found
- Return type:
-
Optional[robokudo.pipeline.Pipeline]
- get_annotator_outputs() robokudo.annotators.outputs.AnnotatorOutputs¶
-
Get the outputs container for all annotators in the current pipeline.
- Returns:
-
Container for all annotator outputs in the pipeline
- Return type:
- Raises:
-
AssertionError – If output container types are invalid
- get_annotator_output_struct() robokudo.annotators.outputs.AnnotatorOutputStruct¶
-
Get the output structure for this specific annotator.
Dynamically adds the annotator to the output structure if not already present.
- Returns:
-
Output structure for this annotator
- Return type:
- add_self_to_annotator_output_struct()¶
-
Add this annotator to the output structure.
Called when the annotator is not yet present in the output structure.
- get_class_name() str¶
-
Get the class name of this annotator.
Unlike self.name which can be customized, this always returns the actual class name.
- Returns:
-
The class name of this annotator
- Return type:
-
str
- get_data_from_analysis_scope(analysis_scope: list)¶
-
Look up data to analyze based on a list of Types or String constants.
Retrieves data from the CAS based on specified types or CASView constants. Can be used to analyze data at different levels (Scene, ObjectHypothesis, etc.) based on the current perception task context.
Example:
get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => { CASViews.COLOR_IMAGE : numpy.array([.. image data...]) } get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => { robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN] }
- Parameters:
-
analysis_scope (list) – List of data types to target (CASView attributes or Annotation subclasses)
- Returns:
-
Dict mapping requested types to corresponding data
- Return type:
-
dict
- Raises:
-
ValueError – If result would be empty
- init_time_recording()¶
-
Initialize the time recording dictionary for performance measurement.
- set_time_recording(value, func='update')¶
-
Record timing information for a function.
- Parameters:
-
value (float) – The timing value to record
func (str, optional) – The function name to associate with the timing, defaults to “update”
- shall_publish_variables() bool¶
-
Check if variables should be published based on descriptor settings.
- Returns:
-
True if variables should be published, False otherwise
- Return type:
-
bool
- setup_published_variables() None¶
-
Setup the ROS Publisher and internal data structure if data shall be published
- Returns:
-
None
- update_published_variable(name: str, value: float) None¶
-
Store a value that shall be published whenever self.publish_variables() is called.
- Parameters:
-
name (str) – Identifier of the variable
value (float) – The actual value (must be a float)
- setup_done_for_published_variables() bool¶
-
Check if variable publishing setup is complete.
- Returns:
-
True if setup is complete, False otherwise
- Return type:
-
bool
- publish_variables() None¶
-
Publish all collected variable updates at once.
Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning.
This method will only work if you’ve set self.publish_variables = True in the Descriptor of your Annotator
- Returns:
-
None
- class robokudo.annotators.core.Worker(fn, args=())¶
-
Bases:
objectWorker class for executing functions asynchronously.
This class provides a way to run functions in a separate thread and optionally execute a callback when the function completes.
- future¶
- _fn¶
- _args = ()¶
- start(callback=None)¶
-
Start executing the function in a separate thread.
- Parameters:
-
callback (callable, optional) – Function to call when execution completes, defaults to None
- run()¶
-
Execute the function and store its result in the future object.
- class robokudo.annotators.core.ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())¶
-
Bases:
BaseAnnotatorBase class for annotators that perform long-running operations.
This class extends BaseAnnotator to support operations that should not block the main thread. It provides functionality to run computations asynchronously and handle their results.
- compute_worker = None¶
- compute_worker_started = False¶
- compute_worker_thread = None¶
- initialise()¶
-
Initialize the annotator state.
Called when first tick is received and anytime status is not running.
- update()¶
-
Update the annotator state.
Manages the asynchronous computation and returns appropriate status: - RUNNING if computation is ongoing - SUCCESS/FAILURE based on computation result - INVALID if computation failed with an exception
- Returns:
-
Status of the behavior after update
- Return type:
-
py_trees.common.Status
- compute()¶
-
Perform the main computation of the annotator.
This method should be overridden by subclasses to implement the actual computation logic.
- terminate(new_status)¶
-
Handle annotator termination.
Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID
- Parameters:
-
new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)