robokudo.vis.multiprocessed_o3d_visualizer

Open3D-based visualization for RoboKudo pipelines.

This module provides 3D visualization capabilities for RoboKudo pipelines using Open3D. It handles:

  • 3D geometry visualization

  • Point cloud rendering

  • Camera control

  • Coordinate frame display

  • Window management

Classes

O3DVisualizer

Open3D-based visualizer for 3D geometry data.

MemoryMap

A base memory map for shared memory.

ObjectMemoryMap

A memory map for an object in shared memory.

ArrayMemoryMap

A memory map for a numpy array in shared memory.

Geometry3DMemoryMap

A memory map for a geometry in shared memory.

PointCloudMemoryMap

A memory map for a geometry in shared memory.

LineSetMemoryMap

A memory map for a geometry in shared memory.

MeshBaseMemoryMap

A memory map for a geometry in shared memory.

TriangleMeshMemoryMap

A memory map for a geometry in shared memory.

OrientedBoundingBoxMemoryMap

A memory map for a geometry in shared memory.

AxisAlignedBoundingBoxMemoryMap

A memory map for a geometry in shared memory.

TetraMeshMemoryMap

A memory map for a geometry in shared memory.

HalfEdgeMemoryMap

A memory map for an object in shared memory.

HalfEdgeTriangleMeshMemoryMap

A memory map for a geometry in shared memory.

VoxelGrid3DMemoryMap

A memory map for a geometry in shared memory.

Octree3DMemoryMap

A memory map for a geometry in shared memory.

ObjectMemoryMapFactory

A factory class for creating geometry memory maps from open3d geometry objects.

Geometry3DMemoryMapFactory

A factory class for creating geometry memory maps from open3d geometry3d objects.

MemoryMapTransport

A message containing geometry data for visualization.

SharedMemoryManager

A manager for geometries in shared memory.

MultiprocessedViewer3DClient

MultiprocessedViewer3D

A wrapper class for the Viewer3D class to run it in a separate process.

Module Contents

class robokudo.vis.multiprocessed_o3d_visualizer.O3DVisualizer(*args: typing_extensions.Any, **kwargs: typing_extensions.Any)

Bases: robokudo.vis.visualizer.Visualizer, robokudo.vis.visualizer.Visualizer.Observer

Open3D-based visualizer for 3D geometry data.

This class provides visualization of 3D geometry data from pipeline annotators using Open3D windows. It supports:

  • 3D geometry visualization

  • Point cloud rendering

  • Camera control

  • Coordinate frame display

  • Shared visualization state

Note

This Visualizer works with a shared state and needs notifications

viewer3d: typing_extensions.Optional[MultiprocessedViewer3D] = None

Open3D viewer instance

notify(observable: robokudo.vis.visualizer.Visualizer.Observable, *args: typing_extensions.Any, **kwargs: typing_extensions.Any) None

Handle notification of state changes.

Parameters:

observable – The object that sent the notification

tick() None

Update the visualization display.

This method:

  • Initializes viewer if needed

  • Gets current annotator outputs

  • Updates display if needed

  • Handles viewer lifecycle

Returns:

False if visualization should terminate, True otherwise

window_title() str

Get the window title for this visualizer.

class robokudo.vis.multiprocessed_o3d_visualizer.MemoryMap

Bases: object

A base memory map for shared memory.

byte_size: int

Size of the underlying data in bytes.

class robokudo.vis.multiprocessed_o3d_visualizer.ObjectMemoryMap

Bases: MemoryMap

A memory map for an object in shared memory.

classmethod from_object(obj: typing_extensions.Any) ObjectMemoryMap

Create a new memory map for the given object.

write_object(write_buf: memoryview, write_idx: int, obj: typing_extensions.Any) int

Write the given object to the shared memory using the memory map.

Parameters:
  • write_buf – The memoryview to write to.

  • write_idx – The index to start writing at.

  • obj – The object to write.

Returns:

The new write index.

read_object(read_buf: memoryview, read_idx: int) typing_extensions.Tuple[typing_extensions.Any, int]

Read an object from the shared memory using the memory map.

Parameters:
  • read_buf – The memoryview to read from.

  • read_idx – The index to start reading at.

Returns:

The object and the new read index.

class robokudo.vis.multiprocessed_o3d_visualizer.ArrayMemoryMap

Bases: MemoryMap

A memory map for a numpy array in shared memory.

shape: typing_extensions.Tuple

Shape of the underlying array.

dtype: str

Datatype of the underlying data as a string.

classmethod from_numpy_array(array: numpy.typing.NDArray) ArrayMemoryMap

Create a new memory map for the given numpy array.

class robokudo.vis.multiprocessed_o3d_visualizer.Geometry3DMemoryMap

Bases: MemoryMap

A memory map for a geometry in shared memory.

name: str

Name of the underlying geometry.

type: typing_extensions.Type
material: typing_extensions.Optional[open3d.visualization.rendering.MaterialRecord] = None
group: typing_extensions.Optional[str] = None
time: typing_extensions.Optional[float] = None
is_visible: typing_extensions.Optional[bool] = None
mapped_attributes = []

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

classmethod from_geometry(name: str, geometry: open3d.geometry.Geometry3D, material: typing_extensions.Optional[open3d.visualization.rendering.MaterialRecord] = None, group: typing_extensions.Optional[str] = None, time: typing_extensions.Optional[float] = None, is_visible: typing_extensions.Optional[bool] = None) Geometry3DMemoryMap

Create a new memory memory map for the given geometry.

classmethod from_geometry_dict(geometry: typing_extensions.Dict) Geometry3DMemoryMap

Create a new memory map from a geometry dictionary.

as_geometry_dict(shm: multiprocessing.shared_memory.SharedMemory, read_idx: int) typing_extensions.Tuple[typing_extensions.Dict, int]

Create an open3d geometry dict from the memory map.

_write_attribute(write_buf: memoryview, write_idx: int, attribute_map: typing_extensions.Any, geometry_attribute: typing_extensions.Any) int
write_geometry(shm: multiprocessing.shared_memory.SharedMemory, write_idx: int, geometry: open3d.geometry.Geometry3D) int

Write the given geometry to the shared memory using the memory map.

to_geometry(shm: multiprocessing.shared_memory.SharedMemory, read_idx: int) typing_extensions.Tuple[open3d.geometry.PointCloud, int]

Read the geometry from the shared memory using the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.PointCloudMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

points: ArrayMemoryMap

Memory map of the point clouds points.

normals: ArrayMemoryMap

Memory map of the point clouds point normals.

colors: ArrayMemoryMap

Memory map of the point clouds point colors.

covariances: ArrayMemoryMap

Memory map of the point clouds point covariances.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.LineSetMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

colors: ArrayMemoryMap

Memory map of the line set colors.

lines: ArrayMemoryMap

Memory map of the line set lines.

points: ArrayMemoryMap

Memory map of the line set points.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.MeshBaseMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

vertices: ArrayMemoryMap

Memory map of the mesh vertices.

vertex_normals: ArrayMemoryMap

Memory map of the vertex normals.

vertex_colors: ArrayMemoryMap

Memory map of the vertex colors.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.TriangleMeshMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

vertices: ArrayMemoryMap

Memory map of the mesh vertices.

vertex_normals: ArrayMemoryMap

Memory map of the vertex normals.

vertex_colors: ArrayMemoryMap

Memory map of the vertex colors.

triangles: ArrayMemoryMap

Memory map of the mesh triangles.

triangle_normals: ArrayMemoryMap

Memory map of the mesh triangle normals.

triangle_uvs: ArrayMemoryMap

Memory map of the mesh triangle uvs.

triangle_material_ids: ArrayMemoryMap

Memory map of the mesh triangle material ids.

textures: typing_extensions.List[ArrayMemoryMap]

Memory map of the mesh textures.

adjacency_list: typing_extensions.List[ArrayMemoryMap]

Memory map of the mesh adjacency list.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.OrientedBoundingBoxMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

center: ArrayMemoryMap

Memory map of the oriented bounding box center.

color: ArrayMemoryMap

Memory map of the oriented bounding box color.

extent: ArrayMemoryMap

Memory map of the oriented bounding box extent.

R: ArrayMemoryMap

Memory map of the oriented bounding box extent.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.AxisAlignedBoundingBoxMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

color: ArrayMemoryMap

Memory map of the axis aligned bounding box color.

max_bound: ArrayMemoryMap

Memory map of the axis aligned bounding box maximum bound.

min_bound: ArrayMemoryMap

Memory map of the axis aligned bounding box maximum bound.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.TetraMeshMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

tetras: ArrayMemoryMap

Memory map of the tetra mesh tetras.

vertex_colors: ArrayMemoryMap

Memory map of the tetra mesh vertex colors.

vertex_normals: ArrayMemoryMap

Memory map of the tetra mesh vertex normals.

vertices: ArrayMemoryMap

Memory map of the tetra mesh vertices.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.HalfEdgeMemoryMap

Bases: ObjectMemoryMap

A memory map for an object in shared memory.

data: ArrayMemoryMap

Memory map containing next, triangle_index, twin and vertex_indices.

classmethod from_object(obj: open3d.geometry.HalfEdge) HalfEdgeMemoryMap

Create a new memory map for the given object.

write_object(write_buf: memoryview, write_idx: int, obj: open3d.geometry.HalfEdge) int

Write the given object to the shared memory using the memory map.

Parameters:
  • write_buf – The memoryview to write to.

  • write_idx – The index to start writing at.

  • obj – The object to write.

Returns:

The new write index.

read_object(read_buf: memoryview, read_idx: int) typing_extensions.Tuple[open3d.geometry.HalfEdge, int]

Read an object from the shared memory using the memory map.

Parameters:
  • read_buf – The memoryview to read from.

  • read_idx – The index to start reading at.

Returns:

The object and the new read index.

class robokudo.vis.multiprocessed_o3d_visualizer.HalfEdgeTriangleMeshMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

half_edges: typing_extensions.List[HalfEdgeMemoryMap]

Memory map of the half edge mesh half edges.

ordered_half_edge_from_vertex: typing_extensions.List[ArrayMemoryMap]

Memory map of the half edge mesh ordered half edge from vertex.

triangle_normals: ArrayMemoryMap

Memory map of the half edge mesh triangle normals.

triangles: ArrayMemoryMap

Memory map of the half edge mesh triangles.

vertex_colors: ArrayMemoryMap

Memory map of the half edge mesh vertex colors.

vertex_normals: ArrayMemoryMap

Memory map of the half edge mesh vertex normals.

vertices: ArrayMemoryMap

Memory map of the half edge mesh vertices.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.VoxelGrid3DMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

origin: ArrayMemoryMap

Memory map of the voxel grid origin.

voxel_size: ArrayMemoryMap

Memory map of the voxel grid voxel size.

mapped_attributes

A list of (attribute name, attribute type) for the open3d attributes mapped by the memory map.

class robokudo.vis.multiprocessed_o3d_visualizer.Octree3DMemoryMap

Bases: Geometry3DMemoryMap

A memory map for a geometry in shared memory.

max_depth: int

Maximum depth of the octree.

origin: ArrayMemoryMap

Memory map of the octree origin.

root_node: open3d.geometry.OctreeNode

Memory map of the octree root node.

size: float

Memory map of the octree size.

class robokudo.vis.multiprocessed_o3d_visualizer.ObjectMemoryMapFactory

A factory class for creating geometry memory maps from open3d geometry objects.

proxies: typing_extensions.Dict[typing_extensions.Type, typing_extensions.Type[ObjectMemoryMap]]

Map of open3d geometry types to their corresponding memory map types.

classmethod has_proxy(obj: typing_extensions.Any) bool
classmethod from_object(obj: typing_extensions.Any) ObjectMemoryMap

Create a geometry proxy from a geometry3d object.

class robokudo.vis.multiprocessed_o3d_visualizer.Geometry3DMemoryMapFactory

A factory class for creating geometry memory maps from open3d geometry3d objects.

proxies: typing_extensions.Dict[typing_extensions.Type, typing_extensions.Type[Geometry3DMemoryMap]]

Map of open3d geometry types to their corresponding memory map types.

classmethod has_proxy(obj: typing_extensions.Any) bool
classmethod from_geometry(name: str, geometry: open3d.geometry.Geometry3D) Geometry3DMemoryMap

Create a geometry proxy from a geometry3d object.

classmethod from_geometry_dict(geometry: typing_extensions.Dict) Geometry3DMemoryMap

Create a geometry proxy from a geometry3d object.

class robokudo.vis.multiprocessed_o3d_visualizer.MemoryMapTransport

Bases: object

A message containing geometry data for visualization.

shm_name: str

The shared memory to read from.

memory_maps: typing_extensions.List[Geometry3DMemoryMap] = []

The memory mappings for the geometries.

class robokudo.vis.multiprocessed_o3d_visualizer.SharedMemoryManager

Bases: object

A manager for geometries in shared memory.

memory_maps: typing_extensions.List[Geometry3DMemoryMap] = []

A list of all memory maps managed by this object.

write_cursor: int = 0

The current end byte index of the shared memory (sum of all memory map sizes).

read_cursor: int = 0

The current start byte index of the shared memory (sum of all memory map sizes).

append(memory_map: Geometry3DMemoryMap) int

Add a memory map to the shared memory manager.

Parameters:

memory_map – MemoryMap to add to the shared memory manager.

Returns:

The byte index to start writing to for the appended memory map

extend(memory_maps: typing_extensions.List[Geometry3DMemoryMap]) typing_extensions.List[int]

Add a list of memory maps to the shared memory manager.

Returns:

The byte indices to start writing to for each of the appended memory maps

read() typing_extensions.Iterator[typing_extensions.Tuple[int, Geometry3DMemoryMap]]

Read all memory maps from the shared memory manager.

reset() None

Reset the shared memory manager to its initial state.

class robokudo.vis.multiprocessed_o3d_visualizer.MultiprocessedViewer3DClient(title: str, cmd_conn: multiprocessing.connection.Connection)

Bases: object

rk_logger: logging.Logger = None

Logger instance

viewer3d

Viewer3D instance for visualization.

cmd_conn

Communication connection for sending and receiving commands from the main process.

name_to_shm: typing_extensions.Dict[str, multiprocessing.shared_memory.SharedMemory]

Mapping of shared memory names to shared memory instances.

name_to_shm_manager: typing_extensions.Dict[str, SharedMemoryManager]

Mapping of shared memory names to shared memory instances.

visualized_geometries: typing_extensions.List[str] = []

List of the names of the currently visualized geometries

geometries_lock: threading.Lock

Lock for synchronizing access to the geometries list.

geometries: typing_extensions.List[typing_extensions.Union[typing_extensions.Dict, open3d.geometry.Geometry3D]] = []

List of the geometries to use for updating the viewer.

receiver_thread

A thread for listening to commands from the main process.

get_shm(shm_name: str) multiprocessing.shared_memory.SharedMemory

Get the shared memory instance for the given shared memory name.

Parameters:

shm_name – The name of the shared memory to get.

get_shm_manager(shm_name: str) SharedMemoryManager

Get the current shared memory manager.

Parameters:

shm_name – The name of the shared memory to get the manager for.

Returns:

The shared memory manager for the given shared memory name.

run() None

Run the visualization client.

close() None

Close the visualization client and clean up resources.

update_geometry() None

Update the geometry in the viewer.

listen()

Listen for commands from the main process and handle them accordingly.

class robokudo.vis.multiprocessed_o3d_visualizer.MultiprocessedViewer3D(title: str, shm_size: int = 5000000000)

Bases: object

A wrapper class for the Viewer3D class to run it in a separate process.

rk_logger: logging.Logger = None

Logger instance

draw_queue: multiprocessing.Queue

Multiprocessing queue for triggering drawing events.

buffer_count = 2

Number of buffers to use for communication.

buffer_write_cursor = 0

Index of the shm to write to.

buffer_read_cursor = 1

Index of the shm to read to.

shms

Shared memory instances for communicating with the viewer process.

shm_names

Names of the shared memory instances.

memory_manager

A manager for underlying data in shared memory.

parent_cmd_conn: multiprocessing.connection.Connection

Pipe connection for sending and receiving commands from the main process.

child_cmd_conn: multiprocessing.connection.Connection

Pipe connection for sending and receiving commands on the visualizer process.

visualizer_process: multiprocessing.Process

A process running a viewer3d instance.

static run_visualizer(title: str, cmd_conn: multiprocessing.connection.Connection) None

Run the viewer3d instance in a separate process.

Parameters:
  • title – Window title for the viewer.

  • cmd_conn – Connection for sending and receiving commands from the main process.

property _read_shm: multiprocessing.shared_memory.SharedMemory

The shared memory that is currently readable.

property _write_shm: multiprocessing.shared_memory.SharedMemory

The shared memory that is currently writeable.

property _read_manager: SharedMemoryManager

The manager for the shared memory that is currently readable.

property _write_manager: SharedMemoryManager

The manager for the shared memory that is currently writeable.

_swap() typing_extensions.Tuple[int, int]

Rotate the read and write buffers.

Returns:

The indices of the new buffers in format (read_buffer, write_buffer)

tick() typing_extensions.Any

Update the viewer display.

Returns:

False if visualization should terminate, True otherwise

update_cloud(geometries: typing_extensions.Optional[typing_extensions.Union[open3d.geometry.Geometry, typing_extensions.Dict, typing_extensions.List]]) None

Update the displayed geometries.

This method updates the Open3D visualizer based on the outputs of the annotators. For the first update, it also sets up the camera and coordinate frame.

Parameters:

geometries – Geometries to display. Can be:

Note

The dict format follows Open3D’s draw() convention. See: https://github.com/isl-org/Open3D/blob/master/examples/python/visualization/draw.py

close() None

Clean up shared memory and process resources.