robokudo.annotators.core¶
Classes¶
Custom Worker/Thread class to be able to access occured exceptions and returns |
|
An annotator base that can be used to work with a ros action server. |
Module Contents¶
- class robokudo.annotators.core.BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME)¶
-
Bases:
py_trees.behaviour.Behaviour
- rk_logger = None¶
- descriptor¶
- setup(timeout)¶
-
Delayed initialisation. For example ROS pub/sub, drivers,
- initialise()¶
-
Called when first tick is received and anytime status is not running after.
- update()¶
-
Called every time the behavior is ticked.
- terminate(new_status)¶
-
Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID
- key_callback(key)¶
- mouse_callback(event, x, y, flags, param)¶
-
This method is called whenever the mouse is interacting with the visualizer window.
- Parameters:
-
event – The event type. See cv2.EVENT_* constants.
x – The x coordinate of the mouse click
y – The y coordinate of the mouse click
flags – Flags passed by OpenCV
param – Parameters passed by OpenCV
- get_cas() robokudo.cas.CAS | None ¶
-
Get the CAS that is in the parent Robokudo pipeline.
- get_parent_pipeline() robokudo.pipeline.Pipeline | None ¶
-
Get the pipeline of this annotator
- get_annotator_outputs() robokudo.annotators.outputs.AnnotatorOutputs ¶
- get_annotator_output_struct() robokudo.annotators.outputs.AnnotatorOutputStruct ¶
- add_self_to_annotator_output_struct()¶
- get_class_name() str ¶
-
As opposed to self.name, this method will always yield the name of the class of self. :return: string with the class name
- get_data_from_analysis_scope(analysis_scope: list)¶
-
Look up the data to analyze based on a list of Types or String constants of typical RoboKudo data classes. Typical inputs will be CASView.X or types of Annotations like robokudo.types.scene.ObjectHypothesis. This method will look up the desired data from self.get_cas().
You can for example use this method if your Annotator might analyze data on a Scene-Level or also on ObjectHypothesis, based on the current perception task context.
Example: get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => {
CASViews.COLOR_IMAGE : numpy.array([.. image data…])
}
get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => {
robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, … ObjectHypothesisN]
}
- Parameters:
-
analysis_scope – Non-empty list of data to target. Supported are attributes from CASView or subclasses of robokudo.types.core.Annotation.
- Returns:
-
A dict. The key equals the type you put into analysis_scope. The value is the corresponding data.
- Raise:
-
ValueError, when result would return empty dict.
- init_time_recording()¶
- set_time_recording(value, func='update')¶
- shall_publish_variables() bool ¶
-
Is the descriptor of this Annotator suggesting to publish data? :return: True if variables shall be published, false otherwise
- setup_published_variables() None ¶
-
Setup the ROS Publisher and internal data structure if data shall be published
- Returns:
-
None
- update_published_variable(name: str, value: float) None ¶
-
Store a value that shall be published whenever self.publish_variables() is called.
- Parameters:
-
name – identifier of the variable
value – the actual value. Must be a float.
- setup_done_for_published_variables() bool ¶
- publish_variables() None ¶
-
Publish all the collected variable updates at once.
Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning.
This method will only work if you’ve set self.publish_variables = True in the Descriptor of your Annotator
- Returns:
-
None
- class robokudo.annotators.core.Worker(fn, args=())¶
-
Bases:
object
Custom Worker/Thread class to be able to access occured exceptions and returns to handle them in the caller process.
- future¶
- _fn¶
- _args = ()¶
- start(callback=None)¶
- run()¶
- class robokudo.annotators.core.ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())¶
-
Bases:
BaseAnnotator
- compute_worker = None¶
- compute_worker_started = False¶
- compute_worker_thread = None¶
- initialise()¶
-
Called when first tick is received and anytime status is not running after.
- update()¶
-
Called every time the behavior is ticked.
- compute()¶
-
This method is doing the heavy lifting of the annotator, if it takes to long to run.
- terminate(new_status)¶
-
Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID
- class robokudo.annotators.core.LegacyThreadedAnnotator(name='LegacyThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())¶
-
Bases:
BaseAnnotator
- compute_thread = None¶
- compute_thread_started = None¶
- success_queue = None¶
- feedback_message_queue = None¶
- initialise()¶
-
Called when first tick is received and anytime status is not running after.
- update()¶
-
Called every time the behavior is ticked.
- compute(success: queue.Queue, feedback_message: queue.Queue)¶
-
This method is doing the heavy lifting of the annotator, if it takes to long to run.
- terminate(new_status)¶
-
Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID
- class robokudo.annotators.core.ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor())¶
-
Bases:
BaseAnnotator
An annotator base that can be used to work with a ros action server.
- ros_pkg_name = 'robokudo'¶
- class Descriptor¶
-
Bases:
BaseAnnotator
- parameters¶
- _action_client = None¶
- _last_goal = None¶
- _last_result = None¶
- _last_feedback = None¶
- _result_is_novel = False¶
- _waiting_for_result = False¶
- setup(timeout)¶
-
Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc.
- update()¶
-
Receive data from action server and send new goal. Called every time the behavior is ticked.
- abstract _create_goal_msg() Any ¶
-
Create a goal to send to the action server. Called in every update method call.
- _feedback_callback(feedback_msg)¶
-
Receive feedback from the action server. Called when feedback from the action server is received.
- abstract _process_result(goal, result)¶
-
Process the result received from the action server. Called when a result was received.
- _get_result_callback(status, result)¶
-
Get result from action server. Callback for receiving results.
- _connect_to_server(timeout: int = 3)¶
-
Connect to the action server. Called in setup method call.
- _start_server()¶