robokudo.annotators.query ========================= .. py:module:: robokudo.annotators.query .. autoapi-nested-parse:: Query handling annotator for RoboKudo. This module provides an annotator that handles queries from external ROS nodes. It supports: * Spawning an action server for query handling * Type-agnostic query processing * Asynchronous query response * Integration with ROS action system * CAS annotation with query data The module is used for: * External system integration * Query-based perception * Interactive perception tasks * Asynchronous data exchange Classes ------- .. autoapisummary:: robokudo.annotators.query.QueryAnnotator robokudo.annotators.query.QueryFeedback robokudo.annotators.query.QueryFeedbackAndCount robokudo.annotators.query.QueryReply robokudo.annotators.query.GenerateQueryResult robokudo.annotators.query.QueryActionServer Module Contents --------------- .. py:class:: QueryAnnotator(name='QueryAnnotator') Bases: :py:obj:`robokudo.annotators.core.BaseAnnotator` Handle external queries through ROS action server. This Annotator spawns an Action Server that listens for Queries from external ROS nodes. It will then annotate the CAS and put the Query into CASViews.QUERY. The Annotator and the Actionserver are type-agnostic, which means that you are not bound to a specific type of query. You can pass these from your AE to this QueryAnnotator. :ivar feedback_instance: Feedback message template :type feedback_instance: robokudo_msgs.msg.QueryFeedback :ivar result_instance: Result message template :type result_instance: robokudo_msgs.msg.QueryResult :ivar action_server: server for action request handling :type action_server: QueryActionServer or None .. py:attribute:: feedback_instance .. py:attribute:: result_instance .. py:attribute:: action_server :value: None .. py:method:: setup(**kwargs: Any) Ensure that the Query Server is spawned early on, directly after PPT creation. .. py:method:: initialise() Initialize query handling. Sets up the action server if not already initialized. Stores server instance on blackboard for access by other nodes. :return: None .. py:method:: update() Process new queries and update CAS. Checks for new queries from action server and updates CAS if found. Provides feedback about query status. :return: SUCCESS if query processed, RUNNING if waiting :rtype: py_trees.Status .. py:class:: QueryFeedback(name='QueryFeedback', feedback_str='') Bases: :py:obj:`robokudo.annotators.core.BaseAnnotator` A test class which simply generates a fixed-string feedback. .. py:attribute:: feedback_str :value: '' .. py:method:: update() Update the annotator state. Called every time the behavior is ticked. :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:class:: QueryFeedbackAndCount(name='QueryFeedback', count_until=20, return_code=py_trees.common.Status.RUNNING) Bases: :py:obj:`robokudo.annotators.core.BaseAnnotator` A test class which simply counts up until a fixed number. Until this number is reached, a pre-defined status is returned. .. py:attribute:: i :value: 0 .. py:attribute:: count_until :value: 20 .. py:attribute:: return_code .. py:method:: update() Update the annotator state. Called every time the behavior is ticked. :return: Status of the behavior after update :rtype: py_trees.common.Status .. py:class:: QueryReply(name='QueryReply') Bases: :py:obj:`robokudo.annotators.core.BaseAnnotator` A test class which simply generates an empty Query Answer to check if the Action server can reply properly. Create a single, empty Object Designator that will be sent to the caller. .. py:method:: initialise() Initialize reply generator. :return: None .. py:method:: update() -> py_trees.common.Status Generate test query response. Creates an empty ObjectDesignator with a test pose and adds it to blackboard. :return: SUCCESS after generating response :rtype: py_trees.Status .. py:class:: GenerateQueryResult(name='GenerateQueryResult') Bases: :py:obj:`robokudo.annotators.core.BaseAnnotator` This class reads in the annotations done by the previous Annotators and generates Object Designators from them. These will be placed into the Blackboard so that a running Query Action Server can pick the information up and send it as a query reply. .. py:attribute:: rk_logger :value: None .. py:attribute:: color_converter .. py:attribute:: class_converter .. py:attribute:: position_converter .. py:attribute:: stamped_position_converter .. py:attribute:: pose_converter .. py:attribute:: stamped_pose_converter .. py:attribute:: shape_converter .. py:attribute:: cuboid_converter .. py:attribute:: sphere_converter .. py:attribute:: location_converter .. py:attribute:: bb_size_converter .. py:attribute:: type_converter .. py:method:: update() Generate query result from current CAS annotations. For each ObjectHypothesis in CAS: * Creates ObjectDesignator * Adds color information if available * Adds classification if available * Adds pose information if available * Packages into query result :return: SUCCESS after generating result :rtype: py_trees.Status .. py:class:: QueryActionServer(name, feedback_instance=Query.Feedback(), result_instance=Query.Result(), action_type=Query) Bases: :py:obj:`rclpy.node.Node` ROS action server for handling perception queries. Action server that listens for queries and executes them by checking blackboard for results generated by QueryAnnotator and QueryReply. :ivar _action_name: Name of ROS action :type _action_name: str :ivar _as: Action server instance :type _as: actionlib.SimpleActionServer :ivar new_query: Latest received query :type new_query: robokudo_msgs.msg.QueryGoal :ivar query: Currently processing query :type query: robokudo_msgs.msg.QueryGoal .. py:attribute:: _action_name .. py:attribute:: _as .. py:attribute:: feedback_instance .. py:attribute:: result_instance .. py:attribute:: new_query :value: None .. py:attribute:: query :value: None .. py:attribute:: query_processed_event .. py:attribute:: logger :value: None .. py:method:: reset_bookkeeping_vars() Reset internal state variables. Clears query state and blackboard variables. :return: None .. py:method:: goal_cb(goal_request) .. py:method:: cancel_cb(goal_handle) .. py:method:: start_processing() Start processing new query. Tell the ActionServer that we are now starting the execution and it can start the monitoring/response process. :return: None .. py:method:: is_active() Check if query is being processed. :return: True if query active, False otherwise :rtype: bool .. py:method:: execute_cb(goal_handle) :async: Action server execution callback. Handles: * Query reception and validation * Processing status monitoring * Preemption requests * Error handling * Result generation and sending :param goal_handle: Query goal from client :type goal_handle: :return: None