robokudo.annotators.query

Query handling annotator for RoboKudo.

This module provides an annotator that handles queries from external ROS nodes. It supports:

  • Spawning an action server for query handling

  • Type-agnostic query processing

  • Asynchronous query response

  • Integration with ROS action system

  • CAS annotation with query data

The module is used for:

  • External system integration

  • Query-based perception

  • Interactive perception tasks

  • Asynchronous data exchange

Classes

QueryAnnotator

Handle external queries through ROS action server.

QueryFeedback

A test class which simply generates a fixed-string feedback.

QueryFeedbackAndCount

A test class which simply counts up until a fixed number.

QueryReply

A test class which simply generates an empty Query Answer to check

GenerateQueryResult

This class reads in the annotations done by the previous Annotators

QueryActionServer

ROS action server for handling perception queries.

Module Contents

class robokudo.annotators.query.QueryAnnotator(name='QueryAnnotator')

Bases: robokudo.annotators.core.BaseAnnotator

Handle external queries through ROS action server.

This Annotator spawns an Action Server that listens for Queries from external ROS nodes. It will then annotate the CAS and put the Query into CASViews.QUERY. The Annotator and the Actionserver are type-agnostic, which means that you are not bound to a specific type of query. You can pass these from your AE to this QueryAnnotator.

Variables:
  • feedback_instance – Feedback message template

  • result_instance – Result message template

  • action_server – server for action request handling

feedback_instance
result_instance
action_server = None
setup(timeout=None, node=None, visitor=None)

Ensure that the Query Server is spawned early on, directly after PPT creation.

initialise()

Initialize query handling.

Sets up the action server if not already initialized. Stores server instance on blackboard for access by other nodes.

Returns:

None

update()

Process new queries and update CAS.

Checks for new queries from action server and updates CAS if found. Provides feedback about query status.

Returns:

SUCCESS if query processed, RUNNING if waiting

Return type:

py_trees.Status

class robokudo.annotators.query.QueryFeedback(name='QueryFeedback', feedback_str='')

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply generates a fixed-string feedback.

feedback_str = ''
update()

Update the annotator state.

Called every time the behavior is ticked.

Returns:

Status of the behavior after update

Return type:

py_trees.common.Status

class robokudo.annotators.query.QueryFeedbackAndCount(name='QueryFeedback', count_until=20, return_code=py_trees.common.Status.RUNNING)

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply counts up until a fixed number. Until this number is reached, a pre-defined status is returned.

i = 0
count_until = 20
return_code
update()

Update the annotator state.

Called every time the behavior is ticked.

Returns:

Status of the behavior after update

Return type:

py_trees.common.Status

class robokudo.annotators.query.QueryReply(name='QueryReply')

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply generates an empty Query Answer to check if the Action server can reply properly. Create a single, empty Object Designator that will be sent to the caller.

initialise()

Initialize reply generator.

Returns:

None

update() py_trees.common.Status

Generate test query response.

Creates an empty ObjectDesignator with a test pose and adds it to blackboard.

Returns:

SUCCESS after generating response

Return type:

py_trees.Status

class robokudo.annotators.query.GenerateQueryResult(name='GenerateQueryResult')

Bases: robokudo.annotators.core.BaseAnnotator

This class reads in the annotations done by the previous Annotators and generates Object Designators from them. These will be placed into the Blackboard so that a running Query Action Server can pick the information up and send it as a query reply.

rk_logger = None
color_converter
class_converter
position_converter
stamped_position_converter
pose_converter
stamped_pose_converter
shape_converter
cuboid_converter
sphere_converter
location_converter
bb_size_converter
type_converter
update()

Generate query result from current CAS annotations.

For each ObjectHypothesis in CAS: * Creates ObjectDesignator * Adds color information if available * Adds classification if available * Adds pose information if available * Packages into query result

Returns:

SUCCESS after generating result

Return type:

py_trees.Status

class robokudo.annotators.query.QueryActionServer(name, feedback_instance=Query.Feedback(), result_instance=Query.Result(), action_type=Query)

Bases: rclpy.node.Node

ROS action server for handling perception queries.

Action server that listens for queries and executes them by checking blackboard for results generated by QueryAnnotator and QueryReply.

Variables:
  • _action_name – Name of ROS action

  • _as – Action server instance

  • new_query – Latest received query

  • query – Currently processing query

_action_name
_as
feedback_instance
result_instance
new_query = None
query = None
query_processed_event
logger = None
reset_bookkeeping_vars()

Reset internal state variables.

Clears query state and blackboard variables.

Returns:

None

goal_cb(goal_request)
cancel_cb(goal_handle)
start_processing()

Start processing new query.

Tell the ActionServer that we are now starting the execution and it can start the monitoring/response process.

Returns:

None

is_active()

Check if query is being processed.

Returns:

True if query active, False otherwise

Return type:

bool

async execute_cb(goal_handle)

Action server execution callback.

Handles: * Query reception and validation * Processing status monitoring * Preemption requests * Error handling * Result generation and sending

Parameters:

goal_handle – Query goal from client

Returns:

None