robokudo.cas¶
Common Analysis Structure (CAS) for RoboKudo.
This module provides the core data structure used throughout RoboKudo for storing and managing data during pipeline execution. The CAS (Common Analysis Structure) holds sensor data, annotations, and other information that is shared between annotators.
The module provides:
Standard view definitions for common data types
Methods for storing and retrieving data
Support for annotations and filtering
Timestamp management
Classes¶
Module Contents¶
- class robokudo.cas.CASViews¶
-
Standard view definitions for the Common Analysis Structure.
This class defines the standard keys used to store and access different types of data in the CAS. These keys ensure consistent access to common data types like images, point clouds, and camera information.
- COLOR_IMAGE: str = 'color_image'¶
-
RGB image data
- DEPTH_IMAGE: str = 'depth_image'¶
-
Depth image data
- COLOR2DEPTH_RATIO: str = 'color2depth_ratio'¶
-
Scale factor to scale the COLOR_IMAGE to the resolution of the DEPTH_IMAGE in x, y format.
Example: 1280x960 RGB, 640x480 DEPTH -> 0.5 along X and Y
- CAM_INFO: str = 'cam_info'¶
-
ROS camera info message coming from ROS
- CAM_INTRINSIC: str = 'cam_intrinsic'¶
-
Open3D pinhole camera intrinsic model for RGB to be set by the camera driver.
- PC_CAM_INTRINSIC: str = 'pc_cam_intrinsic'¶
-
Camera intrinsic that has been used for point cloud generation. This can be different, because depth and RGB resolutions might mismatch.
- CLOUD: str = 'cloud'¶
-
Point cloud data
- QUERY: str = 'query'¶
-
Query information
- VIEWPOINT_CAM_TO_WORLD: str = 'viewpoint_cam_to_world'¶
-
DEPRECATED: Use CAM_TO_WORLD_TRANSFORM instead. Camera to world transform. Type: robokudo.types.tf.StampedTransform
- CAM_TO_WORLD_TRANSFORM: str = 'cam_to_world_transform'¶
-
Camera to world. Type: semantic_digital_twin.spatial_types.spatial_types.HomogeneousTransformationMatrix
- DATA_TIMESTAMP: str = 'data_timestamp'¶
-
Nanoseconds since epoch at which the sensor data has been received. type: Int
- CAS_ID: str = 'cas_id'¶
-
Monotonic ID of the CAS instance within a single pipeline run. type: Int
- OBJECT_IMAGE: str = 'object_image'¶
-
Object image data. This view is used in imagistic reasoning pipelines where a rendered scene can be fully segmented per object.
- OBJECT_COLOR_MAP: str = 'object_color_map'¶
-
Object color mapping data which assigns objects visible in OBJECT_IMAGE to entity names.
- class robokudo.cas.CAS¶
-
The main data representation in RoboKudo.
This class provides the central data structure used by annotators to store and retrieve information. Each pipeline has its own CAS instance that maintains views (singular data like sensor readings) and annotations (multiple descriptors of the same data).
Views can be extended during runtime as they are supposed to be flexibly extended by different Annotators. However, some views are prevalent in most pipelines and can be accessed directly via @property.
- timestamp: float¶
-
Unix timestamp when this CAS was created. In Nanoseconds since Epoch.
- timestamp_readable: str¶
-
Human readable timestamp string.
- views: typing_extensions.Dict[str, typing_extensions.Any]¶
-
Dictionary storing view data, each view stores data that is typically singular for a single CAS.
Example: Sensor data, cam info and cloud which are read from the sensors.
- annotations: typing_extensions.List[robokudo.types.core.Annotation] = []¶
-
List of annotations, each annotation describes a certain part of the acquired sensor data. In contrast to views there can be multiple annotations for the same ‘thing’ in the data.
- __post_init__() None¶
- property color_image: typing_extensions.Optional[numpy.ndarray]¶
- property depth_image: typing_extensions.Optional[numpy.ndarray]¶
- property color2depth_ratio: typing_extensions.Optional[typing_extensions.Tuple[float, float]]¶
- property cam_info: typing_extensions.Optional[sensor_msgs.msg.CameraInfo]¶
- property cam_intrinsic: typing_extensions.Optional[open3d.camera.PinholeCameraIntrinsic]¶
- property pc_cam_intrinsic: typing_extensions.Optional[open3d.camera.PinholeCameraIntrinsic]¶
- property cloud: typing_extensions.Optional[open3d.geometry.PointCloud]¶
- property viewpoint_cam_to_world: typing_extensions.Optional[robokudo.types.tf.StampedTransform]¶
- property cam_to_world_transform: typing_extensions.Optional[semantic_digital_twin.spatial_types.spatial_types.HomogeneousTransformationMatrix]¶
- property data_timestamp: typing_extensions.Optional[int]¶
- property query: typing_extensions.Optional[typing_extensions.Any]¶
- property cas_id: typing_extensions.Optional[int]¶
- get(view_name: str) typing_extensions.Any¶
-
Get a view by name.
- Parameters:
-
view_name – Name of the view to retrieve
- Returns:
-
The view data
- Raises:
-
KeyError – If the view does not exist
- contains(view_name: str) bool¶
-
Check if a view exists.
- Parameters:
-
view_name – Name of the view to check
- Returns:
-
True if the view exists
- get_copy(view_name: str) typing_extensions.Any¶
-
Get a deep copy of a view.
- Parameters:
-
view_name – Name of the view to copy
- Returns:
-
Deep copy of the view data
- Raises:
-
KeyError – If the view does not exist
- set(view_name: str, value: typing_extensions.Any) None¶
-
Put data in the CAS index by a given view name. This method will make a deepcopy of value.
- Parameters:
-
view_name – The name of the view which should be selected from constants in the CASView class.
value – The value that will be placed in the CAS under view_name by making a deepcopy of it.
- set_ref(view_name: str, value: typing_extensions.Any) None¶
-
Put data in the CAS index by a given view name. In contrast to set(), this will not make a copy but just does an assignment.
- Parameters:
-
view_name – The name of the view which should be selected from constants in the CASView class.
value – The value that will be placed in the CAS under view_name by making assigning it.
- T¶
- static filter_by_type(type_to_include: typing_extensions.Type[T], input_list: typing_extensions.List[typing_extensions.Any]) typing_extensions.List[T]¶
-
Filter a list to include only objects of a specific type.
- Parameters:
-
type_to_include – Type to filter for
input_list – List to filter
- Returns:
-
Filtered list containing only objects of the specified type
- filter_annotations_by_type(type_to_include: typing_extensions.Type[T]) typing_extensions.List[T]¶
-
Filter annotations to include only those of a specific type.
- Parameters:
-
type_to_include – Type to filter for
- Returns:
-
A filtered list of annotations in CAS
- static _filter_objects(objects: typing_extensions.List[typing_extensions.Any], criteria: typing_extensions.Dict[str, typing_extensions.Tuple[str, typing_extensions.Any]]) typing_extensions.List[typing_extensions.Any]¶
-
Filters a list of objects based on specified criteria.
- Parameters:
-
objects – List of objects to be filtered.
criteria – A dictionary where keys are attribute names and values are tuples containing the comparison operator as a string (“==”, “>”, “<”, “>=”, “<=”) and the value to compare against.
- Returns:
-
Filtered list of objects matching all criteria
- filter_by_type_and_criteria(type_to_include: typing_extensions.Type, input_list: typing_extensions.List[typing_extensions.Any], criteria: typing_extensions.Dict[str, typing_extensions.Tuple[str, typing_extensions.Any]]) typing_extensions.List[typing_extensions.Any]¶
-
Filters a list of objects based on specified criteria. Objects must be of type ‘type_to_include’
- Parameters:
-
type_to_include – All the returned objects must be of this type.
input_list – List of objects to be filtered.
criteria – A dictionary where keys are attribute names and values are tuples containing the comparison operator as a string (“==”, “>”, “<”, “>=”, “<=”) and the value to compare against.
- Returns:
-
A list of objects of type ‘type_to_include’ that match all specified attribute values.