Advanced Query Handling


Introduction

This tutorial builds upon the earlier query handling tutorial. It assumes you are already familiar with the basics of sending queries in RoboKudo. In this session, you’ll explore how RoboKudo handles queries using a behavior tree pipeline with support for validation, feedback, preemption, and results publishing.

By the end of this tutorial, you will:

  • Understand how the Query Interface works in RoboKudo.

  • Learn how queries are validated, processed, monitored, and canceled in a behavior tree.

  • See how live feedback and final results are handled dynamically.


Important Concepts

Term

Meaning

Why it matters

Query

A message asking the robot to do something (e.g., “count numbers”).

It starts the pipeline.

Feedback

Small updates showing the task is in progress.

Helps monitor that the robot is alive and working.

Final Result

The final answer after the task finishes.

Needed for other programs to know what happened.

Preemption

(Advanced word for) Cancellation. If needed, we can stop a task early.

Keeps the system safe and responsive.

Behavior Tree

A smart structure where tasks run in order and return SUCCESS, FAILURE, or RUNNING.

Makes building robot logic easy and modular.


Full Working Example (query_test.py)

Below is the full working Python code used to implement the behavior tree pipeline that handles a simple “numbers” query.

'''
This script defines a Robokudo-based behavior tree pipeline for processing queries related to numbers.

Features:
- Implements a PyTrees behavior tree to handle query processing.
- Includes a PrintNumbers behavior that sequentially processes numbers from 1 to 10.
- Utilizes a CheckQueryType annotator to validate if the query type is 'numbers'.
- Incorporates a conditional selector to manage task execution with preemption handling.
- Integrates with the Robokudo analysis engine and pipeline framework.
- Uses a feedback mechanism to report progress dynamically.
'''
import robokudo
from robokudo.cas import CASViews
import robokudo.pipeline
import robokudo.analysis_engine
import queue
import py_trees
import robokudo.analysis_engine
import robokudo.pipeline
import robokudo.descriptors.camera_configs.config_hsr
import robokudo.io.camera_interface
from robokudo.annotators.query import QueryAnnotator
from robokudo.behaviours.action_server_checks import ActionServerNoPreemptRequest
from robokudo.annotators.core import BaseAnnotator
from robokudo.idioms import pipeline_init
from py_trees.composites import Sequence
from py_trees.composites import Selector
from robokudo.identifier import BBIdentifier
from robokudo_msgs.action import Query



class PrintNumbers(BaseAnnotator) :
    def __init__(self, name="PrintNumbers") :
        super().__init__(name)
        self.current_number = 1
        self.completed = False
        self.result = []
        self.result_string = ""

    def initialise(self) :
        self.result = []
        self.result_string = ""
        self.current_number = 1
        self.completed = False

    def update(self) :
        blackboard = py_trees.blackboard.Blackboard()
        feedback_queue = blackboard.get(BBIdentifier.QUERY_FEEDBACK)
        if feedback_queue is None :
            feedback_queue = queue.Queue()
            blackboard.set(BBIdentifier.QUERY_FEEDBACK, feedback_queue)


        if self.current_number <= 100 :
            # Append number and update result
            print(f"Current Number: {self.current_number}")
            self.result.append(self.current_number)
            self.result_string += f"{self.current_number}, "

            # Add feedback message
            feedback_msg = Query.Feedback()
            feedback_msg.feedback = f"Processing number: {self.result_string}"
            feedback_queue.put(feedback_msg)

            self.current_number += 1
            return py_trees.common.Status.RUNNING
        else :
            self.completed = True
            blackboard.set(BBIdentifier.QUERY_ANSWER, self.result_string)
            return py_trees.common.Status.SUCCESS



class CheckQueryType(BaseAnnotator):
    """
    Checks if the query type on the CAS is 'numbers'.
    """

    def __init__(self, name="CheckQueryType"):
        super().__init__(name)

    def update(self):
        # Retrieve the query from the CAS
        query = self.get_cas().get(CASViews.QUERY)

        # Check if the query type is 'numbers'
        if query and getattr(query.obj, 'type', None) == 'numbers':
            self.logger.info("Query type is 'numbers'. Proceeding.")
            return py_trees.common.Status.SUCCESS
        else:
            self.logger.warning(f"Query type is not 'numbers' or query is missing: {query}")
            return py_trees.common.Status.FAILURE


class AnalysisEngine(robokudo.analysis_engine.AnalysisEngineInterface):
    def name(self):
        return "test_pipeline"

    def implementation(self) -> robokudo.pipeline.Pipeline:
        # Define the sequence that runs the task
        task_sequence = Sequence(name="TaskSequence", memory=True)
        task_sequence.add_children([
            CheckQueryType(),  # Check if the query type is 'numbers'
            PrintNumbers()     # Execute the task if the query is valid
        ])

        # Combine preemption handling and task execution in a selector
        conditional_selector = Selector(name="ConditionalSelector", memory=False)
        conditional_selector.add_children([
            py_trees.decorators.Inverter(
                name="Invert Preempt Request",
                child=ActionServerNoPreemptRequest()
            ),
            task_sequence  # Run task sequence only if no preemption
        ])

        # Main Pipeline
        seq = robokudo.pipeline.Pipeline("OPExperiments")
        seq.add_children([
            pipeline_init(),       # Initialize the pipeline
            QueryAnnotator(),      # Handle incoming queries
            conditional_selector   # Preemption-aware task execution
        ])

        return seq


Full Behavior Tree Overview

Here’s a visual map of the pipeline:

RootNodeWithGUI
├── VisManager
└── OPExperiments (WithMemory)
    ├── pipeline_init()
    ├── QueryAnnotator
    └── ConditionalSelector (NoMemory)
        ├── Invert Preempt Request
        │   └── ActionServerNoPreemptRequest
        └── TaskSequence (WithMemory)
            ├── CheckQueryType
            └── PrintNumbers

PrintNumbers – The Worker Node

What it does:

  • Counts from 1 to 100.

  • Sends feedback after each number.

  • Saves the complete list as the final answer when done.

Key points:

  • Uses the blackboard to store intermediate and final values.

  • Returns RUNNING while still counting, and SUCCESS when finished.


CheckQueryType – The Gatekeeper

What it does:

  • Reads the query from the Common Analysis Structure (CAS).

  • Checks if query.type == "numbers".

  • If valid, it returns SUCCESS. Otherwise, FAILURE.

Why it matters:

  • Prevents the system from executing unintended or malformed queries.

  • Serves as a safe filter before performing expensive work.


TaskSequence – Validation and Execution Together

TaskSequence (WithMemory)
├── CheckQueryType
└── PrintNumbers

What it does:

  • Combines validation and execution into one sequence.

  • Only runs PrintNumbers if CheckQueryType succeeds.

Why memory=True?

  • Ensures task progress isn’t lost on every tick.

  • Allows smooth continuation during long-running operations.


Preemption Handling with ConditionalSelector

ConditionalSelector (NoMemory)
├── Invert Preempt Request
│   └── ActionServerNoPreemptRequest
└── TaskSequence
    ├── CheckQueryType
    └── PrintNumbers

What it does:

  • Always checks if a cancel request (preemption) has been issued.

  • If detected, stops execution immediately by succeeding early.

  • Otherwise, proceeds to run the full task sequence.

Why memory=False?

  • Cancellation status must be re-evaluated on every tick to react instantly.

  • Prevents stale decisions.


ActionServerNoPreemptRequest and Inverter

What they do:

  • ActionServerNoPreemptRequest detects cancel requests via the blackboard.

    • If cancel is active → returns FAILURE.

    • Otherwise → returns SUCCESS.

  • The Inverter flips this result:

    • Cancellation → becomes SUCCESS (Selector stops task).

    • No cancellation → becomes FAILURE (Selector continues to task).

Why inversion is needed:

  • A Selector proceeds on first SUCCESS.

  • The inverter makes cancellation detectable as a SUCCESS condition.


Action Client – How to Test the Pipeline

To test this query pipeline, you can use the action_client.py script (located in robokudo/action_servers/action_client.py).
This script sends a query, receives live feedback, and can cancel the task mid-way if needed.

This script is helpful for simulating real-world queries, verifying feedback flow, and testing cancellation behavior.

Alternatively, you can also use the axclient.py tool as demonstrated in the previous basic tutorial.

To activate the CheckQueryType and PrintNumbers path in the behavior tree, make sure the query includes:

goal_msg.obj.type = "numbers"

Only this value will pass validation and start number processing; other values will cause the tree to stop early.