robokudo.annotators.core

Classes

BaseAnnotator

Worker

Custom Worker/Thread class to be able to access occured exceptions and returns

ThreadedAnnotator

LegacyThreadedAnnotator

ActionClientAnnotator

An annotator base that can be used to work with a ros action server.

Module Contents

class robokudo.annotators.core.BaseAnnotator(name='Annotator', descriptor=Descriptor(), ros_pkg_name: str = robokudo.defs.PACKAGE_NAME)

Bases: py_trees.behaviour.Behaviour

class Descriptor
class Parameters
class Capabilities
inputs = None
outputs = None
parameters
capabilities
rk_logger = None
descriptor
setup(timeout)

Delayed initialisation. For example ROS pub/sub, drivers,

initialise()

Called when first tick is received and anytime status is not running after.

update()

Called every time the behavior is ticked.

terminate(new_status)

Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID

key_callback(key)
mouse_callback(event, x, y, flags, param)

This method is called whenever the mouse is interacting with the visualizer window.

Parameters:
  • event – The event type. See cv2.EVENT_* constants.

  • x – The x coordinate of the mouse click

  • y – The y coordinate of the mouse click

  • flags – Flags passed by OpenCV

  • param – Parameters passed by OpenCV

get_cas() robokudo.cas.CAS | None

Get the CAS that is in the parent Robokudo pipeline.

get_parent_pipeline() robokudo.pipeline.Pipeline | None

Get the pipeline of this annotator

get_annotator_outputs() robokudo.annotators.outputs.AnnotatorOutputs
get_annotator_output_struct() robokudo.annotators.outputs.AnnotatorOutputStruct
add_self_to_annotator_output_struct()
get_class_name() str

As opposed to self.name, this method will always yield the name of the class of self. :return: string with the class name

get_data_from_analysis_scope(analysis_scope: list)

Look up the data to analyze based on a list of Types or String constants of typical RoboKudo data classes. Typical inputs will be CASView.X or types of Annotations like robokudo.types.scene.ObjectHypothesis. This method will look up the desired data from self.get_cas().

You can for example use this method if your Annotator might analyze data on a Scene-Level or also on ObjectHypothesis, based on the current perception task context.

Example: get_data_from_analysis_scope([CASViews.COLOR_IMAGE]) => {

CASViews.COLOR_IMAGE : numpy.array([.. image data…])

}

get_data_from_analysis_scope([robokudo.types.scene.ObjectHypothesis]) => {

robokudo.types.scene.ObjectHypothesis : [ObjectHypothesis1, … ObjectHypothesisN]

}

Parameters:

analysis_scope – Non-empty list of data to target. Supported are attributes from CASView or subclasses of robokudo.types.core.Annotation.

Returns:

A dict. The key equals the type you put into analysis_scope. The value is the corresponding data.

Raise:

ValueError, when result would return empty dict.

init_time_recording()
set_time_recording(value, func='update')
shall_publish_variables() bool

Is the descriptor of this Annotator suggesting to publish data? :return: True if variables shall be published, false otherwise

setup_published_variables() None

Setup the ROS Publisher and internal data structure if data shall be published

Returns:

None

update_published_variable(name: str, value: float) None

Store a value that shall be published whenever self.publish_variables() is called.

Parameters:
  • name – identifier of the variable

  • value – the actual value. Must be a float.

setup_done_for_published_variables() bool
publish_variables() None

Publish all the collected variable updates at once.

Use this function as a decorator (i.e. @robokudo.utils.decorators.publish_variables) on your update or compute method to automatically publish variables whenever the method is returning.

This method will only work if you’ve set self.publish_variables = True in the Descriptor of your Annotator

Returns:

None

class robokudo.annotators.core.Worker(fn, args=())

Bases: object

Custom Worker/Thread class to be able to access occured exceptions and returns to handle them in the caller process.

future
_fn
_args = ()
start(callback=None)
run()
class robokudo.annotators.core.ThreadedAnnotator(name='ThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())

Bases: BaseAnnotator

compute_worker = None
compute_worker_started = False
compute_worker_thread = None
initialise()

Called when first tick is received and anytime status is not running after.

update()

Called every time the behavior is ticked.

compute()

This method is doing the heavy lifting of the annotator, if it takes to long to run.

terminate(new_status)

Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID

class robokudo.annotators.core.LegacyThreadedAnnotator(name='LegacyThreadedAnnotator', descriptor=BaseAnnotator.Descriptor())

Bases: BaseAnnotator

compute_thread = None
compute_thread_started = None
success_queue = None
feedback_message_queue = None
initialise()

Called when first tick is received and anytime status is not running after.

update()

Called every time the behavior is ticked.

compute(success: queue.Queue, feedback_message: queue.Queue)

This method is doing the heavy lifting of the annotator, if it takes to long to run.

terminate(new_status)

Called whenever behavior switches to !RUNNING state. new_status can be SUCCESS, FAILURE or INVALID

class robokudo.annotators.core.ActionClientAnnotator(name: str = 'ActionClientAnnotator', descriptor=Descriptor())

Bases: BaseAnnotator

An annotator base that can be used to work with a ros action server.

ros_pkg_name = 'robokudo'
class Descriptor

Bases: BaseAnnotator

parameters
_action_client = None
_last_goal = None
_last_result = None
_last_feedback = None
_result_is_novel = False
_waiting_for_result = False
setup(timeout)

Setup annotator to receive data from an action server. Delayed initialisation. For example ROS pub/sub, drivers, etc.

update()

Receive data from action server and send new goal. Called every time the behavior is ticked.

abstract _create_goal_msg() Any

Create a goal to send to the action server. Called in every update method call.

_feedback_callback(feedback_msg)

Receive feedback from the action server. Called when feedback from the action server is received.

abstract _process_result(goal, result)

Process the result received from the action server. Called when a result was received.

_get_result_callback(status, result)

Get result from action server. Callback for receiving results.

_connect_to_server(timeout: int = 3)

Connect to the action server. Called in setup method call.

_start_server()