rp_xcore.xcore_module_base module

class rp_xcore.xcore_module_base.ConnectionType(value)

Bases: Enum

Connection type defines data exchange policies between inputs and outputs.

  • DEFAULT: Default policy, first choice when no special requirements are defined.

  • LATCHING: Latches the last message, suggested for:
    • Outputs that are not updated frequently, but last message is expected to persist.

    • Inputs that are late joining (created after the message is published).

DEFAULT = 'default'
LATCHING = 'latching'
SENSOR_DATA = 'sensor_data'
class rp_xcore.xcore_module_base.IOConfig(msg_type, connection_type: ConnectionType = ConnectionType.DEFAULT)

Bases: object

Inputs and outputs configuration for a module.

This class encapsulates the configuration for inputs and outputs, including the message and connection types.

Parameters

msg_type :

Message type for the input/output.

connection_typeConnectionType, optional

Connection type for the input/output. Default is ConnectionType.DEFAULT.

property qos_profile: QoSProfile

Retrieve the QoS profile based on the connection type.

Returns

QoSProfile

The QoS profile corresponding to the connection type.

class rp_xcore.xcore_module_base.SyncType(value)

Bases: Enum

An enumeration.

AND = 'AND'
OR = 'OR'
class rp_xcore.xcore_module_base.XCoreModuleBase(node_name: str, node_params, sync_type: SyncType = SyncType.AND)

Bases: Node, ABC

class State

Bases: object

FAILURE = 'FAILURE'
INITIALIZING = 'INIT'
PENDING = 'IDLE'
PROCESSING = 'PROCESSING'
TERMINATED = 'TERMINATED'
on_input_var(input_var_name, callback)

Registers a callback for a non-blocking input variable.

abstract process_data(input_msgs)
publish_and_wait_for_ack(topic_name, msg, timeout: float = 0.01) None

Publishes a message and waits for acknowledgment.

Parameters

topic_namestr

The name of the topic to publish to.

msg :

The message to publish.

timeoutfloat, optional

The time to wait for acknowledgment in seconds. Default is 0.01.

Raises

ValueError

If the publisher for the specified topic is not found.

TimeoutError

If the acknowledgment is not received within the specified timeout.

publish_status()
set_state(new_state: NodeState)
setup_dependencies()

Sets up synchronized or asynchronous subscriptions based on ‘sync_type’.

setup_inputs_vars_subscribers_()

Initializes inputs_vars_subscribers_ with message_filters.Subscriber objects.

setup_publishers()

Sets up publishers based on the ‘outputs’ definitions.

rp_xcore.xcore_module_base.run_node(node_class, node_name, use_multithread=False, *args, **kwargs)

Run a ROS2 node with exception handling and optional MultiThreadedExecutor support.

This function initializes the ROS2 system, creates an instance of the specified node class, and manages its lifecycle, including spinning and handling shutdown events.

Parameters

node_classtype

The class of the node to instantiate and run.

node_namestr

The name of the node.

use_multithreadbool

If True, uses MultiThreadedExecutor; otherwise, uses SingleThreadedExecutor.

*argstuple

Additional positional arguments for the node’s constructor.

**kwargsdict

Additional keyword arguments for the node’s constructor.

Notes

  • Supports both SingleThreadedExecutor and MultiThreadedExecutor.

  • Handles KeyboardInterrupt gracefully by logging a warning.

  • Logs any unhandled exceptions with a fatal error message.

  • Ensures the node is destroyed and the ROS2 system is shut down.