robokudo.annotators.core ======================== .. py:module:: robokudo.annotators.core Classes ------- .. autoapisummary:: robokudo.annotators.core.BaseAnnotator robokudo.annotators.core.Worker robokudo.annotators.core.ThreadedAnnotator robokudo.annotators.core.LegacyThreadedAnnotator robokudo.annotators.core.ActionClientAnnotator Module Contents --------------- .. py:class:: BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME) Bases: :py:obj:`py_trees.behaviour.Behaviour` .. py:class:: Descriptor .. py:class:: Parameters .. py:class:: Capabilities .. py:attribute:: inputs :value: None .. py:attribute:: outputs :value: None .. py:attribute:: parameters .. py:attribute:: capabilities .. py:attribute:: rk_logger :value: None .. py:attribute:: descriptor .. py:method:: setup(timeout) Delayed initialisation. For example ROS pub/sub, drivers, .. py:method:: initialise() Called when first tick is received and anytime status is not running after. .. py:method:: update() Called every time the behavior is ticked. .. py:method:: terminate(new_status) Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID .. py:method:: key_callback(key) .. py:method:: mouse_callback(event, x, y, flags, param) This method is called whenever the mouse is interacting with the visualizer window. :param event: The event type. See cv2.EVENT_* constants. :param x: The x coordinate of the mouse click :param y: The y coordinate of the mouse click :param flags: Flags passed by OpenCV :param param: Parameters passed by OpenCV .. py:method:: get_cas() -> Optional[robokudo.cas.CAS] Get the CAS that is in the parent Robokudo pipeline. .. py:method:: get_parent_pipeline() -> Optional[robokudo.pipeline.Pipeline] Get the pipeline of this annotator .. py:method:: get_annotator_outputs() -> robokudo.annotators.outputs.AnnotatorOutputs .. py:method:: get_annotator_output_struct() -> robokudo.annotators.outputs.AnnotatorOutputStruct .. py:method:: add_self_to_annotator_output_struct() .. py:method:: get_class_name() -> str As opposed to self.name, this method will always yield the name of the class of self. :return: string with the class name .. py:method:: get_data_from_analysis_scope(analysis_scope: list) Look up the data to analyze based on a list of Types or String constants of typical RoboKudo data classes. Typical inputs will be CASView.X or types of Annotations like robokudo.types.scene.ObjectHypothesis. This method will look up the desired data from self.get_cas(). You can for example use this method if your Annotator might analyze data on a Scene-Level or also on ObjectHypothesis, based on the current perception task context. Example: get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => { CASViews.COLOR_IMAGE : numpy.array([.. image data...]) } get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => { robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, ... ObjectHypothesisN] } :param analysis_scope: Non-empty list of data to target. Supported are attributes from CASView or subclasses of robokudo.types.core.Annotation. :return: A dict. The key equals the type you put into analysis_scope. The value is the corresponding data. :raise: ValueError, when result would return empty dict. .. py:method:: init_time_recording() .. py:method:: set_time_recording(value, func='update') .. py:method:: shall_publish_variables() -> bool Is the descriptor of this Annotator suggesting to publish data? :return: True if variables shall be published, false otherwise .. py:method:: setup_published_variables() -> None Setup the ROS Publisher and internal data structure if data shall be published :return: None .. py:method:: update_published_variable(name: str, value: float) -> None Store a value that shall be published whenever self.publish_variables() is called. :param name: identifier of the variable :param value: the actual value. *Must* be a float. .. py:method:: setup_done_for_published_variables() -> bool .. py:method:: publish_variables() -> None Publish all the collected variable updates at once. Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning. This method will only work if you've set self.publish_variables = True in the Descriptor of your Annotator :return: None .. py:class:: Worker(fn, args=()) Bases: :py:obj:`object` Custom Worker/Thread class to be able to access occured exceptions and returns to handle them in the caller process. .. py:attribute:: future .. py:attribute:: _fn .. py:attribute:: _args :value: () .. py:method:: start(callback=None) .. py:method:: run() .. py:class:: ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor()) Bases: :py:obj:`BaseAnnotator` .. py:attribute:: compute_worker :value: None .. py:attribute:: compute_worker_started :value: False .. py:attribute:: compute_worker_thread :value: None .. py:method:: initialise() Called when first tick is received and anytime status is not running after. .. py:method:: update() Called every time the behavior is ticked. .. py:method:: compute() This method is doing the heavy lifting of the annotator, if it takes to long to run. .. py:method:: terminate(new_status) Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID .. py:class:: LegacyThreadedAnnotator(name='LegacyThreadedAnnotator', descriptor=BaseAnnotator.Descriptor()) Bases: :py:obj:`BaseAnnotator` .. py:attribute:: compute_thread :value: None .. py:attribute:: compute_thread_started :value: None .. py:attribute:: success_queue :value: None .. py:attribute:: feedback_message_queue :value: None .. py:method:: initialise() Called when first tick is received and anytime status is not running after. .. py:method:: update() Called every time the behavior is ticked. .. py:method:: compute(success: queue.Queue, feedback_message: queue.Queue) This method is doing the heavy lifting of the annotator, if it takes to long to run. .. py:method:: terminate(new_status) Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID .. py:class:: ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor()) Bases: :py:obj:`BaseAnnotator` An annotator base that can be used to work with a ros action server. .. py:attribute:: ros_pkg_name :value: 'robokudo' .. py:class:: Descriptor Bases: :py:obj:`BaseAnnotator` .. py:attribute:: parameters .. py:attribute:: _action_client :value: None .. py:attribute:: _last_goal :value: None .. py:attribute:: _last_result :value: None .. py:attribute:: _last_feedback :value: None .. py:attribute:: _result_is_novel :value: False .. py:attribute:: _waiting_for_result :value: False .. py:method:: setup(timeout) Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc. .. py:method:: update() Receive data from action server and send new goal. Called every time the behavior is ticked. .. py:method:: _create_goal_msg() -> Any :abstractmethod: Create a goal to send to the action server. Called in every update method call. .. py:method:: _feedback_callback(feedback_msg) Receive feedback from the action server. Called when feedback from the action server is received. .. py:method:: _process_result(goal, result) :abstractmethod: Process the result received from the action server. Called when a result was received. .. py:method:: _get_result_callback(status, result) Get result from action server. Callback for receiving results. .. py:method:: _connect_to_server(timeout: int = 3) Connect to the action server. Called in setup method call. .. py:method:: _start_server()