robokudo.annotators.core¶
Core annotator classes for RoboKudo.
This module provides base classes for implementing annotators in RoboKudo.
- Classes:
-
BaseAnnotator - Core functionality for CAS access, tree structure, and GUI result handling
ThreadedAnnotator - Extends BaseAnnotator for long-running operations that should not block the main thread
- Features:
-
Common Analysis Structure (CAS) access
Tree structure navigation
GUI result handling
Thread-safe operation
Variable publishing
Action client integration
Classes¶
Base class for all RoboKudo annotators. |
|
Worker class for executing functions asynchronously. |
|
Base class for annotators that perform long-running operations. |
|
Base class for annotators that interact with ROS action servers. |
Module Contents¶
- class robokudo.annotators.core.BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME)¶
-
Bases:
py_trees.behaviour.BehaviourBase class for all RoboKudo annotators.
This class provides core functionality for CAS access, tree structure navigation, and GUI result handling. It serves as the foundation for implementing annotators in the RoboKudo framework.
- Parameters:
-
name (str, optional) – Name of the annotator instance, defaults to “Annotator”
descriptor (BaseAnnotator.Descriptor, optional) – Configuration descriptor for the annotator, defaults to Descriptor()
ros_pkg_name (str, optional) – Name of the ROS package, defaults to robokudo.defs.PACKAGE_NAME
- class Descriptor¶
-
Configuration descriptor for annotator parameters and capabilities.
This class defines the structure for specifying annotator parameters and capabilities. It provides access to global parameters through the Parameters class.
- class Parameters¶
-
Container for annotator parameters.
This class provides access to both annotator-specific and global parameters. Global parameters are accessed through the global_ prefix.
- class Capabilities¶
-
Defines the input and output capabilities of an annotator.
- Variables:
-
inputs – Input types accepted by the annotator
outputs – Output types produced by the annotator
- inputs = None¶
- outputs = None¶
- parameters¶
- capabilities¶
- rk_logger = None¶
- descriptor¶
- setup(timeout=None, node=None, visitor=None)¶
-
Perform delayed initialization.
This method is called for initialization tasks that require ROS to be running, such as setting up publishers/subscribers or drivers.
- Parameters:
-
timeout (float) – Maximum time to wait for initialization
node (rclpy.node.Node) – A ros node
visitor –
- Returns:
-
True if setup was successful, False otherwise
- Return type:
-
bool
- initialise()¶
-
Initialize the annotator state.
Called when first tick is received and anytime status is not running.
- update()¶
-
Update the annotator state.
Called every time the behavior is ticked.
- Returns:
-
Status of the behavior after update
- Return type:
-
py_trees.common.Status
- terminate(new_status)¶
-
Handle annotator termination.
Called whenever behavior switches to a non-RUNNING state.
- Parameters:
-
new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)
- key_callback(key)¶
-
Handle keyboard input events.
- Parameters:
-
key (int) – Key code of the pressed key
- mouse_callback(event, x, y, flags, param)¶
-
Handle mouse interaction events in the visualizer window.
- Parameters:
-
event (int) – The event type (see cv2.EVENT_* constants)
x (int) – The x coordinate of the mouse click
y (int) – The y coordinate of the mouse click
flags (int) – Flags passed by OpenCV
param (Any) – Parameters passed by OpenCV
- get_cas() robokudo.cas.CAS | None¶
-
Get the CAS from the parent RoboKudo pipeline.
- Returns:
-
The Common Analysis Structure (CAS) instance or None if not found
- Return type:
-
Optional[robokudo.cas.CAS]
- get_parent_pipeline() robokudo.pipeline.Pipeline | None¶
-
Get the pipeline containing this annotator.
- Returns:
-
The parent pipeline instance or None if not found
- Return type:
-
Optional[robokudo.pipeline.Pipeline]
- get_annotator_outputs() robokudo.annotators.outputs.AnnotatorOutputs¶
-
Get the outputs container for all annotators in the current pipeline.
- Returns:
-
Container for all annotator outputs in the pipeline
- Return type:
- Raises:
-
AssertionError – If output container types are invalid
- get_annotator_output_struct() robokudo.annotators.outputs.AnnotatorOutputStruct¶
-
Get the output structure for this specific annotator.
Dynamically adds the annotator to the output structure if not already present.
- Returns:
-
Output structure for this annotator
- Return type:
- add_self_to_annotator_output_struct()¶
-
Add this annotator to the output structure.
Called when the annotator is not yet present in the output structure.
- get_class_name() str¶
-
Get the class name of this annotator.
Unlike self.name which can be customized, this always returns the actual class name.
- Returns:
-
The class name of this annotator
- Return type:
-
str
- get_data_from_analysis_scope(analysis_scope: list)¶
-
Look up data to analyze based on a list of Types or String constants.
Retrieves data from the CAS based on specified types or CASView constants. Can be used to analyze data at different levels (Scene, ObjectHypothesis, etc.) based on the current perception task context.
Example:
get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => { CASViews.COLOR_IMAGE : numpy.array([.. image data...]) } get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => { robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN] }
- Parameters:
-
analysis_scope (list) – List of data types to target (CASView attributes or Annotation subclasses)
- Returns:
-
Dict mapping requested types to corresponding data
- Return type:
-
dict
- Raises:
-
ValueError – If result would be empty
- init_time_recording()¶
-
Initialize the time recording dictionary for performance measurement.
- set_time_recording(value, func='update')¶
-
Record timing information for a function.
- Parameters:
-
value (float) – The timing value to record
func (str, optional) – The function name to associate with the timing, defaults to “update”
- shall_publish_variables() bool¶
-
Check if variables should be published based on descriptor settings.
- Returns:
-
True if variables should be published, False otherwise
- Return type:
-
bool
- setup_published_variables() None¶
-
Setup the ROS Publisher and internal data structure if data shall be published
- Returns:
-
None
- update_published_variable(name: str, value: float) None¶
-
Store a value that shall be published whenever self.publish_variables() is called.
- Parameters:
-
name (str) – Identifier of the variable
value (float) – The actual value (must be a float)
- setup_done_for_published_variables() bool¶
-
Check if variable publishing setup is complete.
- Returns:
-
True if setup is complete, False otherwise
- Return type:
-
bool
- publish_variables() None¶
-
Publish all collected variable updates at once.
Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning.
This method will only work if you’ve set self.publish_variables = True in the Descriptor of your Annotator
- Returns:
-
None
- class robokudo.annotators.core.Worker(fn, args=())¶
-
Bases:
objectWorker class for executing functions asynchronously.
This class provides a way to run functions in a separate thread and optionally execute a callback when the function completes.
- future¶
- _fn¶
- _args = ()¶
- start(callback=None)¶
-
Start executing the function in a separate thread.
- Parameters:
-
callback (callable, optional) – Function to call when execution completes, defaults to None
- run()¶
-
Execute the function and store its result in the future object.
- class robokudo.annotators.core.ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())¶
-
Bases:
BaseAnnotatorBase class for annotators that perform long-running operations.
This class extends BaseAnnotator to support operations that should not block the main thread. It provides functionality to run computations asynchronously and handle their results.
- compute_worker = None¶
- compute_worker_started = False¶
- compute_worker_thread = None¶
- initialise()¶
-
Initialize the annotator state.
Called when first tick is received and anytime status is not running.
- update()¶
-
Update the annotator state.
Manages the asynchronous computation and returns appropriate status: - RUNNING if computation is ongoing - SUCCESS/FAILURE based on computation result - INVALID if computation failed with an exception
- Returns:
-
Status of the behavior after update
- Return type:
-
py_trees.common.Status
- compute()¶
-
Perform the main computation of the annotator.
This method should be overridden by subclasses to implement the actual computation logic.
- terminate(new_status)¶
-
Handle annotator termination.
Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID
- Parameters:
-
new_status (py_trees.common.Status) – New status of the behavior (SUCCESS, FAILURE or INVALID)
- class robokudo.annotators.core.ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor())¶
-
Bases:
BaseAnnotatorBase class for annotators that interact with ROS action servers.
This class provides functionality to work with ROS action servers, including connection management, goal sending, and result/feedback handling.
- ros_pkg_name = 'robokudo'¶
- class Descriptor¶
-
Bases:
BaseAnnotatorConfiguration descriptor for action client annotators.
- parameters¶
- _action_client = None¶
- _last_goal = None¶
- _last_result = None¶
- _last_feedback = None¶
- _result_is_novel = False¶
- _waiting_for_result = False¶
- setup(timeout=None, node=None, visitor=None)¶
-
Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc.
- Parameters:
-
timeout (float) – Maximum time to wait for setup completion
node (rclpy.node.Node) – A ros node
visitor –
- Returns:
-
True if setup was successful, False otherwise
- Return type:
-
bool
- update() py_trees.common.Status¶
-
Receive data from action server and send new goal. Called every time the behavior is ticked.
- Returns:
-
Status of the behavior after update
- Return type:
-
py_trees.common.Status
- abstract _create_goal_msg() Any¶
-
Create a goal to send to the action server. Called in every update method call.
- Returns:
-
The goal message to send
- Return type:
-
Any
- _feedback_callback(feedback_msg)¶
-
Receive feedback from the action server. Called when feedback from the action server is received.
- Parameters:
-
feedback_msg (Any) – Feedback message from the action server
- abstract _process_result(goal, result)¶
-
Process the result received from the action server. Called when a result was received.
- Parameters:
-
goal (Any) – The original goal that was sent
result (Any) – The result received from the action server
- Returns:
-
True for success, False for failure, or a py_trees.common.Status
- Return type:
-
Union[bool, py_trees.common.Status]
- _get_result_callback(status, result)¶
-
Get result from action server. Callback for receiving results.
- Parameters:
-
status (actionlib.GoalStatus) – The terminal status of the goal
result (Any) – The result from the action server
- _connect_to_server(timeout: int = 3)¶
-
Connect to the action server. Called in setup method call.
- Parameters:
-
timeout (int, optional) – Maximum time to wait for connection, defaults to 3
- Returns:
-
True if connection successful, False otherwise
- Return type:
-
bool
- _start_server()¶
-
Start the action server if configured to do so.