Advanced Query Handling


Introduction

This tutorial builds upon the earlier query handling tutorial. It assumes you are already familiar with the basics of sending queries in RoboKudo. In this session, you’ll explore how RoboKudo handles queries using a behavior tree pipeline with support for validation, feedback, preemption, and results publishing.

By the end of this tutorial, you will:

  • Understand how the Query Interface works in RoboKudo.

  • Learn how queries are validated, processed, monitored, and canceled in a behavior tree.

  • See how live feedback and final results are handled dynamically.

Warning

This tutorial only works with ROS 2.


Important Concepts

Term

Meaning

Why it matters

Query

A message asking the robot to do something (e.g., “recognize objects on the kitchen table”).

It starts the pipeline.

Feedback

Small updates showing the task is in progress.

Helps monitor that the perception system is alive and working.

Final Result

The final answer after the task finishes.

Relevant to other systems running on the robot to continue execution.

Preemption

(Advanced word for) Cancellation. If needed, we can stop a task early.

Allows to abort running perception tasks if the task is not relevant anymore or has to be replaced by another one. Supports reactivity.

Behavior Tree

A smart structure where tasks run in order and return SUCCESS, FAILURE, or RUNNING.

Makes building robot logic easy and modular.


Full Working Example (query_complex.py)

Below is the full working Python code used to implement the behavior tree pipeline that handles a simple “numbers” query.

import py_trees.common

import robokudo.analysis_engine
from robokudo.annotators.cluster_color import ClusterColorAnnotator

from robokudo.annotators.collection_reader import CollectionReaderAnnotator
from robokudo.annotators.image_preprocessor import ImagePreprocessorAnnotator
from robokudo.annotators.plane import PlaneAnnotator
from robokudo.annotators.pointcloud_cluster_extractor import PointCloudClusterExtractor
from robokudo.annotators.pointcloud_crop import PointcloudCropAnnotator
from robokudo.annotators.query import QueryFeedback
import robokudo.annotators.query

import robokudo.descriptors.camera_configs.config_kinect_robot

import robokudo.io.camera_interface
import robokudo.idioms
from robokudo.behaviours.action_server_checks import ActionServerCheck, ActionServerNoPreemptRequest, AbortGoal
from src.robokudo.robokudo.src.robokudo.annotators.query import QueryFeedbackAndCount


class AnalysisEngine(robokudo.analysis_engine.AnalysisEngineInterface):
    def name(self):
        return "query_complex"

    def implementation(self):
        """
        Create a pipeline which responds to a query
        """
        kinect_camera_config = robokudo.descriptors.camera_configs.config_kinect_robot.CameraConfig()
        kinect_config = CollectionReaderAnnotator.Descriptor(
            camera_config=kinect_camera_config,
            camera_interface=robokudo.io.camera_interface.KinectCameraInterface(kinect_camera_config))

        seq = robokudo.pipeline.Pipeline("RWPipeline")

        task_sequence = py_trees.composites.Sequence(name="TaskSequence", memory=True)
        task_sequence.add_children([
            CollectionReaderAnnotator(descriptor=kinect_config),
            ImagePreprocessorAnnotator("ImagePreprocessor"),
            PointcloudCropAnnotator(),
            PlaneAnnotator(),
            PointCloudClusterExtractor(),
            ClusterColorAnnotator(),
            QueryFeedbackAndCount(count_until=50, return_code=py_trees.common.Status.RUNNING),
        ])

        # Combine preemption handling and task execution in a selector
        conditional_selector = py_trees.composites.Selector(name="ConditionalSelector", memory=False)
        conditional_selector.add_children([
            py_trees.decorators.Inverter(
                name="Invert Preempt Request",
                child=ActionServerNoPreemptRequest()
            ),
            task_sequence  # Run task sequence only if no preemption
        ])

        seq.add_children(
            [
                robokudo.idioms.pipeline_init(),
                robokudo.annotators.query.QueryAnnotator(),
                conditional_selector,
                robokudo.annotators.query.GenerateQueryResult(),
            ])

        return seq

Full Behavior Tree Overview

Here’s a visual map of the pipeline:

RootNodeWithGUI
├── VisManager
└── OPExperiments (WithMemory)
    ├── pipeline_init()
    ├── QueryAnnotator
    ├── ConditionalSelector (NoMemory)
    │   ├── Invert Preempt Request
    │   │   └── ActionServerNoPreemptRequest
    │   └── TaskSequence (WithMemory)
    │       ├── CollectionReaderAnnotator,
    │       ├── ImagePreprocessorAnnotator,
    │       ├── PointcloudCropAnnotator,
    │       ├── PlaneAnnotator,
    │       ├── PointCloudClusterExtractor,
    │       ├── ClusterColorAnnotator,
    │       └── QueryFeedbackAndCount,
    └── GenerateQueryResult

QueryFeedbackAndCount – The Worker Node

A simple example node representing a long running task that sends feedback while running.

What it does:

  • Counts from 0 to 50.

  • Sends feedback after each number.

Key points:

  • Uses the QueryHandler to send feedback.

  • Returns RUNNING while still counting, and SUCCESS when finished.


Preemption Handling with ConditionalSelector

ConditionalSelector (NoMemory)
├── Invert Preempt Request
│   └── ActionServerNoPreemptRequest
└── TaskSequence (WithMemory)
    ├── CollectionReaderAnnotator,
    ├── ImagePreprocessorAnnotator,
    ├── PointcloudCropAnnotator,
    ├── PlaneAnnotator,
    ├── PointCloudClusterExtractor,
    ├── ClusterColorAnnotator,
    └── QueryFeedbackAndCount,

What it does:

  • Always checks if a cancel request (preemption) has been issued.

  • If detected, stops execution immediately by succeeding early.

  • Otherwise, proceeds to run the full task sequence.

Why memory=False?

  • Cancellation status must be re-evaluated on every tick to react instantly.

  • Prevents stale decisions.


ActionServerNoPreemptRequest and Inverter

What they do:

  • ActionServerNoPreemptRequest detects cancel requests via the blackboard.

    • If cancel is active → returns FAILURE.

    • Otherwise → returns SUCCESS.

  • The Inverter flips this result:

    • Cancellation → becomes SUCCESS (Selector stops task).

    • No cancellation → becomes FAILURE (Selector continues to task).

Why inversion is needed:

  • A Selector proceeds on first SUCCESS.

  • The inverter makes cancellation detectable as a SUCCESS condition.


Action Client – How to Test the Pipeline

To test this query pipeline, you can use the query_test_client.py script (located in robokudo/robokudo/src/scripts/query_test_client.py).
This script sends a query, receives live feedback, and can cancel the task mid-way if needed.

This script is helpful for simulating real-world queries, verifying feedback flow, and testing cancellation behavior.

You can use the script as follows:

  1. Start RoboKudo with the query_complex.py analysis engine:

    python3 ~/robokudo_ws/src/robokudo/robokudo/src/scripts/main.py _ae query_complex
    
  2. In another terminal, run the bag file:

    ros2 bag play ~/robokudo_ws/test --loop
    
  3. Run the query test client:

    python3 ~/robokudo_ws/src/robokudo/robokudo/src/scripts/query_test_client.py
    

The test client will send a query, receive live feedback, and can cancel the task mid-way if needed. To cancel the goal mid-way, press ctrl+c in the terminal where you started the test client or let the client cancel it automatically after a given time with a CLI argument:

python3 ~/robokudo_ws/src/robokudo/robokudo/src/scripts/query_test_client.py --preempt_timer=5.0

This will cancel the goal automatically after 5 seconds.