robokudo.annotators.query

Query handling annotator for RoboKudo.

This module provides an annotator that handles queries from external ROS nodes. It supports:

  • Spawning an action server for query handling

  • Type-agnostic query processing

  • Asynchronous query response

  • Integration with ROS action system

  • CAS annotation with query data

The module is used for:

  • External system integration

  • Query-based perception

  • Interactive perception tasks

  • Asynchronous data exchange

Classes

QueryAnnotator

Handle external queries through ROS action server.

QueryFeedback

A test class which simply generates a fixed-string feedback.

QueryFeedbackAndCount

A test class which simply counts up until a fixed number. Until this number is reached, a pre-defined status is returned.

QueryReply

A test class which simply generates an empty Query Answer to check if the Action server can reply properly.

GenerateQueryResult

This class reads in the annotations done by the previous Annotators

QueryActionServer

ROS action server for handling perception queries.

Module Contents

class robokudo.annotators.query.QueryAnnotator(name: str = 'QueryAnnotator')

Bases: robokudo.annotators.core.BaseAnnotator

Handle external queries through ROS action server.

This Annotator spawns an Action Server that listens for Queries from external ROS nodes. It will then annotate the CAS and put the Query into CASViews.QUERY. The Annotator and the Actionserver are type-agnostic, which means that you are not bound to a specific type of query. You can pass these from your AE to this QueryAnnotator.

feedback_instance
result_instance
action_server = None

Action server placeholder.

setup(**kwargs: typing_extensions.Any) None

Ensure that the Query Server is spawned early on, directly after PPT creation.

initialise() None

Initialize query handling.

Sets up the action server if not already initialized. Stores server instance on blackboard for access by other nodes.

update() py_trees.common.Status

Process new queries and update CAS.

Checks for new queries from action server and updates CAS if found. Provides feedback about query status.

Returns:

SUCCESS if query processed, RUNNING if waiting

class robokudo.annotators.query.QueryFeedback(name: str = 'QueryFeedback', feedback_str: str = '')

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply generates a fixed-string feedback.

feedback_str = ''

Feedback string to send to the client.

update() py_trees.common.Status

Update the annotator state.

Called every time the behavior is ticked.

Returns:

Status of the behavior after update

class robokudo.annotators.query.QueryFeedbackAndCount(name: str = 'QueryFeedback', count_until: int = 20, return_code: py_trees.common.Status = Status.RUNNING)

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply counts up until a fixed number. Until this number is reached, a pre-defined status is returned.

i = 0

The current count

count_until = 20

The number until which to count

return_code

The return code to return while still counting

update() py_trees.common.Status

Update the annotator state.

Called every time the behavior is ticked.

Returns:

Status of the behavior after update

class robokudo.annotators.query.QueryReply(name: str = 'QueryReply')

Bases: robokudo.annotators.core.BaseAnnotator

A test class which simply generates an empty Query Answer to check if the Action server can reply properly. Create a single, empty Object Designator that will be sent to the caller.

initialise() None

Initialize reply generator.

update() py_trees.common.Status

Generate test query response.

Creates an empty ObjectDesignator with a test pose and adds it to blackboard.

Returns:

SUCCESS after generating response

class robokudo.annotators.query.GenerateQueryResult(name: str = 'GenerateQueryResult')

Bases: robokudo.annotators.core.BaseAnnotator

This class reads in the annotations done by the previous Annotators and generates Object Designators from them. These will be placed into the Blackboard so that a running Query Action Server can pick the information up and send it as a query reply.

rk_logger = None
color_converter
class_converter
position_converter
stamped_position_converter
pose_converter
stamped_pose_converter
shape_converter
cuboid_converter
cylinder_converter
sphere_converter
location_converter
bb_size_converter
type_converter
update() py_trees.common.Status

Generate query result from current CAS annotations.

For each ObjectHypothesis in CAS: * Creates ObjectDesignator * Adds color information if available * Adds classification if available * Adds pose information if available * Packages into query result

Returns:

SUCCESS after generating result

class robokudo.annotators.query.QueryActionServer(name: str, feedback_instance: robokudo_msgs.action.Query.Feedback = Query.Feedback(), result_instance: robokudo_msgs.action.Query.Result = Query.Result(), action_type: typing_extensions.Type = Query)

Bases: rclpy.node.Node

ROS action server for handling perception queries.

Action server that listens for queries and executes them by checking blackboard for results generated by QueryAnnotator and QueryReply.

_action_name

Name of the ROS action

_as

Action server instance

feedback_instance: robokudo_msgs.action.Query.Feedback

Latest feedback message

result_instance: robokudo_msgs.action.Query.Result

Latest result message

new_query: typing_extensions.Optional[robokudo_msgs.action.Query.Goal] = None

Latest received query

query: typing_extensions.Optional[robokudo_msgs.action.Query.Goal] = None

Currently processing query

query_processed_event: threading.Event

Event to signal query processing completion

logger = None
reset_bookkeeping_vars() None

Reset internal state variables.

Clears query state and blackboard variables.

goal_cb(goal_request: robokudo_msgs.action.Query.Goal) rclpy.action.GoalResponse
cancel_cb(goal_handle: rclpy.action.server.ServerGoalHandle) rclpy.action.CancelResponse
start_processing() None

Start processing new query.

Tell the ActionServer that we are now starting the execution and it can start the monitoring/response process.

is_active() bool

Check if query is being processed.

Returns:

True if query active, False otherwise

async execute_cb(goal_handle: rclpy.action.server.ServerGoalHandle) typing_extensions.Optional[robokudo_msgs.action.Query.Result]

Action server execution callback.

Handles: * Query reception and validation * Processing status monitoring * Preemption requests * Error handling * Result generation and sending

Parameters:

goal_handle – Query goal from client