# Advanced Query Handling --- ## Introduction This tutorial builds upon the earlier query handling tutorial. It assumes you are already familiar with the basics of sending queries in RoboKudo. In this session, you'll explore how RoboKudo handles queries using a behavior tree pipeline with support for validation, feedback, preemption, and results publishing. By the end of this tutorial, you will: - Understand how the Query Interface works in RoboKudo. - Learn how queries are validated, processed, monitored, and canceled in a behavior tree. - See how live feedback and final results are handled dynamically. --- ## Important Concepts | Term | Meaning | Why it matters | | ------------- | ----------------------------------------------------------------------------------- | -------------------------------------------------- | | Query | A message asking the robot to do something (e.g., "count numbers"). | It starts the pipeline. | | Feedback | Small updates showing the task is in progress. | Helps monitor that the robot is alive and working. | | Final Result | The final answer after the task finishes. | Needed for other programs to know what happened. | | Preemption | (Advanced word for) Cancellation. If needed, we can stop a task early. | Keeps the system safe and responsive. | | Behavior Tree | A smart structure where tasks run in order and return SUCCESS, FAILURE, or RUNNING. | Makes building robot logic easy and modular. | --- ## Full Working Example (query\_test.py) Below is the full working Python code used to implement the behavior tree pipeline that handles a simple "numbers" query. ```python ''' This script defines a Robokudo-based behavior tree pipeline for processing queries related to numbers. Features: - Implements a PyTrees behavior tree to handle query processing. - Includes a PrintNumbers behavior that sequentially processes numbers from 1 to 10. - Utilizes a CheckQueryType annotator to validate if the query type is 'numbers'. - Incorporates a conditional selector to manage task execution with preemption handling. - Integrates with the Robokudo analysis engine and pipeline framework. - Uses a feedback mechanism to report progress dynamically. ''' import robokudo from robokudo.cas import CASViews import robokudo.pipeline import robokudo.analysis_engine import queue import py_trees import robokudo.analysis_engine import robokudo.pipeline import robokudo.descriptors.camera_configs.config_hsr import robokudo.io.camera_interface from robokudo.annotators.query import QueryAnnotator from robokudo.behaviours.action_server_checks import ActionServerNoPreemptRequest from robokudo.annotators.core import BaseAnnotator from robokudo.idioms import pipeline_init from py_trees.composites import Sequence from py_trees.composites import Selector from robokudo.identifier import BBIdentifier from robokudo_msgs.action import Query class PrintNumbers(BaseAnnotator) : def __init__(self, name="PrintNumbers") : super().__init__(name) self.current_number = 1 self.completed = False self.result = [] self.result_string = "" def initialise(self) : self.result = [] self.result_string = "" self.current_number = 1 self.completed = False def update(self) : blackboard = py_trees.blackboard.Blackboard() feedback_queue = blackboard.get(BBIdentifier.QUERY_FEEDBACK) if feedback_queue is None : feedback_queue = queue.Queue() blackboard.set(BBIdentifier.QUERY_FEEDBACK, feedback_queue) if self.current_number <= 100 : # Append number and update result print(f"Current Number: {self.current_number}") self.result.append(self.current_number) self.result_string += f"{self.current_number}, " # Add feedback message feedback_msg = Query.Feedback() feedback_msg.feedback = f"Processing number: {self.result_string}" feedback_queue.put(feedback_msg) self.current_number += 1 return py_trees.common.Status.RUNNING else : self.completed = True blackboard.set(BBIdentifier.QUERY_ANSWER, self.result_string) return py_trees.common.Status.SUCCESS class CheckQueryType(BaseAnnotator): """ Checks if the query type on the CAS is 'numbers'. """ def __init__(self, name="CheckQueryType"): super().__init__(name) def update(self): # Retrieve the query from the CAS query = self.get_cas().get(CASViews.QUERY) # Check if the query type is 'numbers' if query and getattr(query.obj, 'type', None) == 'numbers': self.logger.info("Query type is 'numbers'. Proceeding.") return py_trees.common.Status.SUCCESS else: self.logger.warning(f"Query type is not 'numbers' or query is missing: {query}") return py_trees.common.Status.FAILURE class AnalysisEngine(robokudo.analysis_engine.AnalysisEngineInterface): def name(self): return "test_pipeline" def implementation(self) -> robokudo.pipeline.Pipeline: # Define the sequence that runs the task task_sequence = Sequence(name="TaskSequence", memory=True) task_sequence.add_children([ CheckQueryType(), # Check if the query type is 'numbers' PrintNumbers() # Execute the task if the query is valid ]) # Combine preemption handling and task execution in a selector conditional_selector = Selector(name="ConditionalSelector", memory=False) conditional_selector.add_children([ py_trees.decorators.Inverter( name="Invert Preempt Request", child=ActionServerNoPreemptRequest() ), task_sequence # Run task sequence only if no preemption ]) # Main Pipeline seq = robokudo.pipeline.Pipeline("OPExperiments") seq.add_children([ pipeline_init(), # Initialize the pipeline QueryAnnotator(), # Handle incoming queries conditional_selector # Preemption-aware task execution ]) return seq ``` --- ## Full Behavior Tree Overview Here’s a visual map of the pipeline: ``` RootNodeWithGUI ├── VisManager └── OPExperiments (WithMemory) ├── pipeline_init() ├── QueryAnnotator └── ConditionalSelector (NoMemory) ├── Invert Preempt Request │ └── ActionServerNoPreemptRequest └── TaskSequence (WithMemory) ├── CheckQueryType └── PrintNumbers ``` --- ## PrintNumbers – The Worker Node What it does: - Counts from 1 to 100. - Sends feedback after each number. - Saves the complete list as the final answer when done. Key points: - Uses the blackboard to store intermediate and final values. - Returns `RUNNING` while still counting, and `SUCCESS` when finished. --- ## CheckQueryType – The Gatekeeper What it does: - Reads the query from the Common Analysis Structure (CAS). - Checks if `query.type == "numbers"`. - If valid, it returns `SUCCESS`. Otherwise, `FAILURE`. Why it matters: - Prevents the system from executing unintended or malformed queries. - Serves as a safe filter before performing expensive work. --- ## TaskSequence – Validation and Execution Together ``` TaskSequence (WithMemory) ├── CheckQueryType └── PrintNumbers ``` What it does: - Combines validation and execution into one sequence. - Only runs `PrintNumbers` if `CheckQueryType` succeeds. Why memory=True? - Ensures task progress isn't lost on every tick. - Allows smooth continuation during long-running operations. --- ## Preemption Handling with ConditionalSelector ``` ConditionalSelector (NoMemory) ├── Invert Preempt Request │ └── ActionServerNoPreemptRequest └── TaskSequence ├── CheckQueryType └── PrintNumbers ``` What it does: - Always checks if a cancel request (preemption) has been issued. - If detected, stops execution immediately by succeeding early. - Otherwise, proceeds to run the full task sequence. Why memory=False? - Cancellation status must be re-evaluated on every tick to react instantly. - Prevents stale decisions. --- ## ActionServerNoPreemptRequest and Inverter What they do: - `ActionServerNoPreemptRequest` detects cancel requests via the blackboard. - If cancel is active → returns `FAILURE`. - Otherwise → returns `SUCCESS`. - The `Inverter` flips this result: - Cancellation → becomes `SUCCESS` (Selector stops task). - No cancellation → becomes `FAILURE` (Selector continues to task). Why inversion is needed: - A `Selector` proceeds on first `SUCCESS`. - The inverter makes cancellation detectable as a `SUCCESS` condition. --- ## Action Client – How to Test the Pipeline To test this query pipeline, you can use the `action_client.py` script (located in `robokudo/action_servers/action_client.py`).\ This script sends a query, receives live feedback, and can cancel the task mid-way if needed. This script is helpful for simulating real-world queries, verifying feedback flow, and testing cancellation behavior. Alternatively, you can also use the `axclient.py` tool as demonstrated in the previous basic tutorial. To activate the `CheckQueryType` and `PrintNumbers` path in the behavior tree, make sure the query includes: ```python goal_msg.obj.type = "numbers" ``` Only this value will pass validation and start number processing; other values will cause the tree to stop early. ---