robokudo.annotators.core ======================== .. py:module:: robokudo.annotators.core .. autoapi-nested-parse:: Core annotator classes for RoboKudo. This module provides base classes for implementing annotators in RoboKudo. :Classes: * BaseAnnotator - Core functionality for CAS access, tree structure, and GUI result handling * ThreadedAnnotator - Extends BaseAnnotator for long-running operations that should not block the main thread :Features: * Common Analysis Structure (CAS) access * Tree structure navigation * GUI result handling * Thread-safe operation * Variable publishing * Action client integration Classes ------- .. autoapisummary:: robokudo.annotators.core.BaseAnnotator robokudo.annotators.core.Worker robokudo.annotators.core.ThreadedAnnotator Module Contents --------------- .. py:class:: BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME) Bases: :py:obj:`py_trees.behaviour.Behaviour` Base class for all RoboKudo annotators. This class provides core functionality for CAS access, tree structure navigation, and GUI result handling. It serves as the foundation for implementing annotators in the RoboKudo framework. :param name: Name of the annotator instance, defaults to "Annotator" :type name: str, optional :param descriptor: Configuration descriptor for the annotator, defaults to Descriptor() :type descriptor: BaseAnnotator.Descriptor, optional :param ros_pkg_name: Name of the ROS package, defaults to robokudo.defs.PACKAGE_NAME :type ros_pkg_name: str, optional .. py:class:: Descriptor Configuration descriptor for annotator parameters and capabilities. This class defines the structure for specifying annotator parameters and capabilities. It provides access to global parameters through the Parameters class. .. py:class:: Parameters Container for annotator parameters. This class provides access to both annotator-specific and global parameters. Global parameters are accessed through the global_ prefix. .. py:class:: Capabilities Defines the input and output capabilities of an annotator. :ivar inputs: Input types accepted by the annotator :ivar outputs: Output types produced by the annotator .. py:attribute:: inputs :value: None .. py:attribute:: outputs :value: None .. py:attribute:: parameters .. py:attribute:: capabilities .. py:attribute:: rk_logger :value: None .. py:attribute:: descriptor .. py:method:: setup(**kwargs: Any) Perform delayed initialization. This method is called for initialization tasks after the constructor has been called. It can be used for setup calls that require ROS to be running, such as setting up publishers/subscribers or drivers. .. py:method:: initialise() Initialize the annotator state. Called when first tick is received and anytime status is not running. .. py:method:: update() Update the annotator state. Called every time the behavior is ticked. :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:method:: terminate(new_status) Handle annotator termination. Called whenever behavior switches to a non-RUNNING state. :param new_status: New status of the behavior (SUCCESS, FAILURE or INVALID) :type new_status: py_trees.common.Status .. py:method:: key_callback(key) Handle keyboard input events. :param key: Key code of the pressed key :type key: int .. py:method:: mouse_callback(event, x, y, flags, param) Handle mouse interaction events in the visualizer window. :param event: The event type (see cv2.EVENT_* constants) :type event: int :param x: The x coordinate of the mouse click :type x: int :param y: The y coordinate of the mouse click :type y: int :param flags: Flags passed by OpenCV :type flags: int :param param: Parameters passed by OpenCV :type param: Any .. py:method:: get_cas() -> Optional[robokudo.cas.CAS] Get the CAS from the parent RoboKudo pipeline. :return: The Common Analysis Structure (CAS) instance or None if not found :rtype: Optional[robokudo.cas.CAS] .. py:method:: get_parent_pipeline() -> Optional[robokudo.pipeline.Pipeline] Get the pipeline containing this annotator. :return: The parent pipeline instance or None if not found :rtype: Optional[robokudo.pipeline.Pipeline] .. py:method:: get_annotator_outputs() -> robokudo.annotators.outputs.AnnotatorOutputs Get the outputs container for all annotators in the current pipeline. :return: Container for all annotator outputs in the pipeline :rtype: robokudo.annotators.outputs.AnnotatorOutputs :raises AssertionError: If output container types are invalid .. py:method:: get_annotator_output_struct() -> robokudo.annotators.outputs.AnnotatorOutputStruct Get the output structure for this specific annotator. Dynamically adds the annotator to the output structure if not already present. :return: Output structure for this annotator :rtype: robokudo.annotators.outputs.AnnotatorOutputStruct .. py:method:: add_self_to_annotator_output_struct() Add this annotator to the output structure. Called when the annotator is not yet present in the output structure. .. py:method:: get_class_name() -> str Get the class name of this annotator. Unlike self.name which can be customized, this always returns the actual class name. :return: The class name of this annotator :rtype: str .. py:method:: get_data_from_analysis_scope(analysis_scope: list) Look up data to analyze based on a list of Types or String constants. Retrieves data from the CAS based on specified types or CASView constants. Can be used to analyze data at different levels (Scene, ObjectHypothesis, etc.) based on the current perception task context. Example:: get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => { CASViews.COLOR_IMAGE : numpy.array([.. image data...]) } get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => { robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN] } :param analysis_scope: List of data types to target (CASView attributes or Annotation subclasses) :type analysis_scope: list :return: Dict mapping requested types to corresponding data :rtype: dict :raises ValueError: If result would be empty .. py:method:: init_time_recording() Initialize the time recording dictionary for performance measurement. .. py:method:: set_time_recording(value, func='update') Record timing information for a function. :param value: The timing value to record :type value: float :param func: The function name to associate with the timing, defaults to "update" :type func: str, optional .. py:method:: shall_publish_variables() -> bool Check if variables should be published based on descriptor settings. :return: True if variables should be published, False otherwise :rtype: bool .. py:method:: setup_published_variables() -> None Setup the ROS Publisher and internal data structure if data shall be published :return: None .. py:method:: update_published_variable(name: str, value: float) -> None Store a value that shall be published whenever self.publish_variables() is called. :param name: Identifier of the variable :type name: str :param value: The actual value (must be a float) :type value: float .. py:method:: setup_done_for_published_variables() -> bool Check if variable publishing setup is complete. :return: True if setup is complete, False otherwise :rtype: bool .. py:method:: publish_variables() -> None Publish all collected variable updates at once. Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning. This method will only work if you've set self.publish_variables = True in the Descriptor of your Annotator :return: None .. py:class:: Worker(fn, args=()) Bases: :py:obj:`object` Worker class for executing functions asynchronously. This class provides a way to run functions in a separate thread and optionally execute a callback when the function completes. .. py:attribute:: future .. py:attribute:: _fn .. py:attribute:: _args :value: () .. py:method:: start(callback=None) Start executing the function in a separate thread. :param callback: Function to call when execution completes, defaults to None :type callback: callable, optional .. py:method:: run() Execute the function and store its result in the future object. .. py:class:: ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor()) Bases: :py:obj:`BaseAnnotator` Base class for annotators that perform long-running operations. This class extends BaseAnnotator to support operations that should not block the main thread. It provides functionality to run computations asynchronously and handle their results. .. py:attribute:: compute_worker :value: None .. py:attribute:: compute_worker_started :value: False .. py:attribute:: compute_worker_thread :value: None .. py:method:: initialise() Initialize the annotator state. Called when first tick is received and anytime status is not running. .. py:method:: update() Update the annotator state. Manages the asynchronous computation and returns appropriate status: - RUNNING if computation is ongoing - SUCCESS/FAILURE based on computation result - INVALID if computation failed with an exception :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:method:: compute() Perform the main computation of the annotator. This method should be overridden by subclasses to implement the actual computation logic. .. py:method:: terminate(new_status) Handle annotator termination. Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID :param new_status: New status of the behavior (SUCCESS, FAILURE or INVALID) :type new_status: py_trees.common.Status