robokudo.annotators.core ======================== .. py:module:: robokudo.annotators.core .. autoapi-nested-parse:: Core annotator classes for RoboKudo. This module provides base classes for implementing annotators in RoboKudo. :Classes: * BaseAnnotator - Core functionality for CAS access, tree structure, and GUI result handling * ThreadedAnnotator - Extends BaseAnnotator for long-running operations that should not block the main thread :Features: * Common Analysis Structure (CAS) access * Tree structure navigation * GUI result handling * Thread-safe operation * Variable publishing * Action client integration Classes ------- .. autoapisummary:: robokudo.annotators.core.BaseAnnotator robokudo.annotators.core.Worker robokudo.annotators.core.ThreadedAnnotator robokudo.annotators.core.ActionClientAnnotator Module Contents --------------- .. py:class:: BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME) Bases: :py:obj:`py_trees.behaviour.Behaviour` Base class for all RoboKudo annotators. This class provides core functionality for CAS access, tree structure navigation, and GUI result handling. It serves as the foundation for implementing annotators in the RoboKudo framework. :param name: Name of the annotator instance, defaults to "Annotator" :type name: str, optional :param descriptor: Configuration descriptor for the annotator, defaults to Descriptor() :type descriptor: BaseAnnotator.Descriptor, optional :param ros_pkg_name: Name of the ROS package, defaults to robokudo.defs.PACKAGE_NAME :type ros_pkg_name: str, optional .. py:class:: Descriptor Configuration descriptor for annotator parameters and capabilities. This class defines the structure for specifying annotator parameters and capabilities. It provides access to global parameters through the Parameters class. .. py:class:: Parameters Container for annotator parameters. This class provides access to both annotator-specific and global parameters. Global parameters are accessed through the global_ prefix. .. py:class:: Capabilities Defines the input and output capabilities of an annotator. :ivar inputs: Input types accepted by the annotator :ivar outputs: Output types produced by the annotator .. py:attribute:: inputs :value: None .. py:attribute:: outputs :value: None .. py:attribute:: parameters .. py:attribute:: capabilities .. py:attribute:: rk_logger :value: None .. py:attribute:: descriptor .. py:method:: setup(timeout=None, node=None, visitor=None) Perform delayed initialization. This method is called for initialization tasks that require ROS to be running, such as setting up publishers/subscribers or drivers. :param timeout: Maximum time to wait for initialization :type timeout: float :param node: A ros node :type node: rclpy.node.Node :param visitor: :type visitor: :return: True if setup was successful, False otherwise :rtype: bool .. py:method:: initialise() Initialize the annotator state. Called when first tick is received and anytime status is not running. .. py:method:: update() Update the annotator state. Called every time the behavior is ticked. :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:method:: terminate(new_status) Handle annotator termination. Called whenever behavior switches to a non-RUNNING state. :param new_status: New status of the behavior (SUCCESS, FAILURE or INVALID) :type new_status: py_trees.common.Status .. py:method:: key_callback(key) Handle keyboard input events. :param key: Key code of the pressed key :type key: int .. py:method:: mouse_callback(event, x, y, flags, param) Handle mouse interaction events in the visualizer window. :param event: The event type (see cv2.EVENT_* constants) :type event: int :param x: The x coordinate of the mouse click :type x: int :param y: The y coordinate of the mouse click :type y: int :param flags: Flags passed by OpenCV :type flags: int :param param: Parameters passed by OpenCV :type param: Any .. py:method:: get_cas() -> Optional[robokudo.cas.CAS] Get the CAS from the parent RoboKudo pipeline. :return: The Common Analysis Structure (CAS) instance or None if not found :rtype: Optional[robokudo.cas.CAS] .. py:method:: get_parent_pipeline() -> Optional[robokudo.pipeline.Pipeline] Get the pipeline containing this annotator. :return: The parent pipeline instance or None if not found :rtype: Optional[robokudo.pipeline.Pipeline] .. py:method:: get_annotator_outputs() -> robokudo.annotators.outputs.AnnotatorOutputs Get the outputs container for all annotators in the current pipeline. :return: Container for all annotator outputs in the pipeline :rtype: robokudo.annotators.outputs.AnnotatorOutputs :raises AssertionError: If output container types are invalid .. py:method:: get_annotator_output_struct() -> robokudo.annotators.outputs.AnnotatorOutputStruct Get the output structure for this specific annotator. Dynamically adds the annotator to the output structure if not already present. :return: Output structure for this annotator :rtype: robokudo.annotators.outputs.AnnotatorOutputStruct .. py:method:: add_self_to_annotator_output_struct() Add this annotator to the output structure. Called when the annotator is not yet present in the output structure. .. py:method:: get_class_name() -> str Get the class name of this annotator. Unlike self.name which can be customized, this always returns the actual class name. :return: The class name of this annotator :rtype: str .. py:method:: get_data_from_analysis_scope(analysis_scope: list) Look up data to analyze based on a list of Types or String constants. Retrieves data from the CAS based on specified types or CASView constants. Can be used to analyze data at different levels (Scene, ObjectHypothesis, etc.) based on the current perception task context. Example:: get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => { CASViews.COLOR_IMAGE : numpy.array([.. image data...]) } get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => { robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN] } :param analysis_scope: List of data types to target (CASView attributes or Annotation subclasses) :type analysis_scope: list :return: Dict mapping requested types to corresponding data :rtype: dict :raises ValueError: If result would be empty .. py:method:: init_time_recording() Initialize the time recording dictionary for performance measurement. .. py:method:: set_time_recording(value, func='update') Record timing information for a function. :param value: The timing value to record :type value: float :param func: The function name to associate with the timing, defaults to "update" :type func: str, optional .. py:method:: shall_publish_variables() -> bool Check if variables should be published based on descriptor settings. :return: True if variables should be published, False otherwise :rtype: bool .. py:method:: setup_published_variables() -> None Setup the ROS Publisher and internal data structure if data shall be published :return: None .. py:method:: update_published_variable(name: str, value: float) -> None Store a value that shall be published whenever self.publish_variables() is called. :param name: Identifier of the variable :type name: str :param value: The actual value (must be a float) :type value: float .. py:method:: setup_done_for_published_variables() -> bool Check if variable publishing setup is complete. :return: True if setup is complete, False otherwise :rtype: bool .. py:method:: publish_variables() -> None Publish all collected variable updates at once. Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning. This method will only work if you've set self.publish_variables = True in the Descriptor of your Annotator :return: None .. py:class:: Worker(fn, args=()) Bases: :py:obj:`object` Worker class for executing functions asynchronously. This class provides a way to run functions in a separate thread and optionally execute a callback when the function completes. .. py:attribute:: future .. py:attribute:: _fn .. py:attribute:: _args :value: () .. py:method:: start(callback=None) Start executing the function in a separate thread. :param callback: Function to call when execution completes, defaults to None :type callback: callable, optional .. py:method:: run() Execute the function and store its result in the future object. .. py:class:: ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor()) Bases: :py:obj:`BaseAnnotator` Base class for annotators that perform long-running operations. This class extends BaseAnnotator to support operations that should not block the main thread. It provides functionality to run computations asynchronously and handle their results. .. py:attribute:: compute_worker :value: None .. py:attribute:: compute_worker_started :value: False .. py:attribute:: compute_worker_thread :value: None .. py:method:: initialise() Initialize the annotator state. Called when first tick is received and anytime status is not running. .. py:method:: update() Update the annotator state. Manages the asynchronous computation and returns appropriate status: - RUNNING if computation is ongoing - SUCCESS/FAILURE based on computation result - INVALID if computation failed with an exception :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:method:: compute() Perform the main computation of the annotator. This method should be overridden by subclasses to implement the actual computation logic. .. py:method:: terminate(new_status) Handle annotator termination. Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID :param new_status: New status of the behavior (SUCCESS, FAILURE or INVALID) :type new_status: py_trees.common.Status .. py:class:: ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor()) Bases: :py:obj:`BaseAnnotator` Base class for annotators that interact with ROS action servers. This class provides functionality to work with ROS action servers, including connection management, goal sending, and result/feedback handling. .. py:attribute:: ros_pkg_name :value: 'robokudo' .. py:class:: Descriptor Bases: :py:obj:`BaseAnnotator` Configuration descriptor for action client annotators. .. py:attribute:: parameters .. py:attribute:: _action_client :value: None .. py:attribute:: _last_goal :value: None .. py:attribute:: _last_result :value: None .. py:attribute:: _last_feedback :value: None .. py:attribute:: _result_is_novel :value: False .. py:attribute:: _waiting_for_result :value: False .. py:method:: setup(timeout=None, node=None, visitor=None) Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc. :param timeout: Maximum time to wait for setup completion :type timeout: float :param node: A ros node :type node: rclpy.node.Node :param visitor: :type visitor: :return: True if setup was successful, False otherwise :rtype: bool .. py:method:: update() -> py_trees.common.Status Receive data from action server and send new goal. Called every time the behavior is ticked. :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:method:: _create_goal_msg() -> Any :abstractmethod: Create a goal to send to the action server. Called in every update method call. :return: The goal message to send :rtype: Any .. py:method:: _feedback_callback(feedback_msg) Receive feedback from the action server. Called when feedback from the action server is received. :param feedback_msg: Feedback message from the action server :type feedback_msg: Any .. py:method:: _process_result(goal, result) :abstractmethod: Process the result received from the action server. Called when a result was received. :param goal: The original goal that was sent :type goal: Any :param result: The result received from the action server :type result: Any :return: True for success, False for failure, or a py_trees.common.Status :rtype: Union[bool, py_trees.common.Status] .. py:method:: _get_result_callback(status, result) Get result from action server. Callback for receiving results. :param status: The terminal status of the goal :type status: actionlib.GoalStatus :param result: The result from the action server :type result: Any .. py:method:: _connect_to_server(timeout: int = 3) Connect to the action server. Called in setup method call. :param timeout: Maximum time to wait for connection, defaults to 3 :type timeout: int, optional :return: True if connection successful, False otherwise :rtype: bool .. py:method:: _start_server() Start the action server if configured to do so.