Create your own Annotator#

As described in the Introduction, RoboKudo is a multi-expert approach which employs multiple algorithms to annotate parts of the incoming sensor data. We usually call these components Annotators, since their main purpose is reasoning and annotating new features or information.

Create the Annotator#

We will create a very simple Annotator which will simply look for a PointCloud in the CAS and report the size of it.

First, please go to your rk_tutorial package and open the following path: rk_tutorial/src/rk_tutorial/annotators. Here, you can simply create a new file called my_first_annotator.py with the following content:

from timeit import default_timer

import py_trees

import robokudo.annotators
from robokudo.cas import CASViews


class MyFirstAnnotator(robokudo.annotators.core.BaseAnnotator):

    def __init__(self, name="MyFirstAnnotator"):
        """
        Default construction. Minimal one-time init!
        """
        super(MyFirstAnnotator, self).__init__(name)

    def update(self):
        start_timer = default_timer()

        cloud = self.get_cas().get(CASViews.CLOUD)

        self.rk_logger.info(f"Cloud size is: {len(cloud.points)}")

        end_timer = default_timer()
        self.feedback_message = f'Processing took {(end_timer - start_timer):.4f}s'
        return py_trees.Status.SUCCESS

Let us have a closer look at some key aspects of the Annotator. Every Annotator will inherit from the BaseAnnotator class, which is a child of a normal py_trees Behaviour. The main method for every Behaviour is the update method, which is called everytime the Behaviour is invoked. Please note, that code running in your update method should not exceed a runtime of a couple of milliseconds, to keep the overall Behaviour Tree reactive.

Note

In computer vision, we often need to run methods that exceed this time constraint. For this purpose, we have developed the ThreadedAnnotator which you can use in that case. It is an Annotator which keeps your workload running in a thread. Make sure to put your code into the compute method instead of update to make proper use of that functionality when using the ThreadedAnnotator.

Another key element is the access of the CAS, the common data structure for all Annotators. The CAS is basically a python dict with its key being predefinde in CASViews to help the consistent usage of the data in it.

Now we need to include your new annotator into your pipeline that you’ve defined in the previous tutorial.

Integrate your Annotator into the Pipeline#

Go to rk_tutorial/src/rk_tutorial/descriptors/analysis_engines/my_demo.py and paste the following content to it:

import robokudo.analysis_engine

from robokudo.annotators.collection_reader import CollectionReaderAnnotator
from robokudo.annotators.image_preprocessor import ImagePreprocessorAnnotator
from robokudo.annotators.plane import PlaneAnnotator
from robokudo.annotators.pointcloud_cluster_extractor import PointCloudClusterExtractor
from robokudo.annotators.pointcloud_crop import PointcloudCropAnnotator

import robokudo.descriptors.camera_configs.config_kinect_robot_wo_transform

import robokudo.io.camera_interface
import robokudo.idioms
from rk_tutorial.annotators.my_first_annotator import MyFirstAnnotator


class AnalysisEngine(robokudo.analysis_engine.AnalysisEngineInterface):
    def name(self):
        return "my_demo"

    def implementation(self):
        """
        Create a basic pipeline that does tabletop segmentation
        """
        kinect_camera_config = robokudo.descriptors.camera_configs.config_kinect_robot_wo_transform.CameraConfig()
        kinect_config = CollectionReaderAnnotator.Descriptor(
            camera_config=kinect_camera_config,
            camera_interface=robokudo.io.camera_interface.KinectCameraInterface(kinect_camera_config))

        seq = robokudo.pipeline.Pipeline("RWPipeline")
        seq.add_children(
            [
                robokudo.idioms.pipeline_init(),
                CollectionReaderAnnotator(descriptor=kinect_config),
                ImagePreprocessorAnnotator("ImagePreprocessor"),
                PointcloudCropAnnotator(),
                PlaneAnnotator(),
                PointCloudClusterExtractor(),
                MyFirstAnnotator(),
            ])
        return seq

Start that analysis engine and observe the output of your console where you have started RoboKudo. You should there see an output like the following:

robokudo  INFO  2022-06-30 19:08:54,052      my_first_annotator.py   in 24 MyFirstAnnotator.update    Cloud size is: 108262

Note: You might have to observe this directly after starting up RoboKudo in the case that tf Messages are spamming the output.