Facilities in cube:its
cube:its provides a complete ROS-native implementation of ETSI C-ITS facilities on top of Vanetza. This means your ROS 2 application can send and receive CAM, DENM, CPM, VAM, or low-level BTP packets simply by publishing/subscribing to topics — without ever touching ASN.1, BTP, GeoNetworking, or any low-level radio APIs.
Each facility is exposed through a clean set of ROS 2 topics and services using message definitions from:
- vanetza_msgs
- etsi_its_cam_msgs, etsi_its_denm_msgs, etsi_its_cpm_ts_msgs, etsi_its_vam_ts_msgs
- cube_den_msgs (for the DEN service)
The following sections show simple, minimal examples you can use as templates.
Running the Example Nodes
All code snippets in this chapter are a modifed version taken from the v2x_apps ROS 2 package in our example repository. To run any of these examples on your development machine, you need to:
- Build and source the workspace (in your dev container or host):
host:~$ cd ~/cube-its/dev_wshost:~/cube-its/dev_ws$ colcon buildhost:~/cube-its/dev_ws$ source install/setup.bash- Make sure cube:its is running on the cube (and reachable via DDS), e.g.:
cube:~$ start-its --remote- Start an example node using
ros2 run:
host:~/cube-its/dev_ws$ ros2 run v2x_apps <python_node>Replace <python_node> with the actual node name from the examples, for instance:
host:~/cube-its/dev_ws$ ros2 run v2x_apps btp_listenerhost:~/cube-its/dev_ws$ ros2 run v2x_apps btp_senderhost:~/cube-its/dev_ws$ ros2 run v2x_apps ...Once the node is running, you should see log output in the terminal (e.g. received CAMs, DENMs, CPMs, etc.), and the corresponding /its/... or /vanetza/... topics will appear in ros2 topic list.
BTP – Basic Transport Protocol
BTP is the transport layer beneath CAM, DENM, CPM, VAM and all ETSI ITS facilities. cube:its exposes BTP directly so you can implement custom ETSI facilities or proprietary message types.
Listen for BTP Packets
Topic: /vanetza/btp_indication
Message: vanetza_msgs/msg/BtpDataIndication
import rclpyfrom rclpy.node import Nodefrom vanetza_msgs.msg import BtpDataIndication
class BtpListener(Node): def __init__(self): super().__init__("btp_listener") self.subscription = self.create_subscription( BtpDataIndication, "/vanetza/btp_indication", self.listener_callback, 10, )
def listener_callback(self, msg): self.get_logger().info( f"Received {len(msg.data)} bytes on BTP port {msg.destination_port}" )
def main(args=None): rclpy.init(args=args) rclpy.spin(BtpListener()) rclpy.shutdown()Send a BTP Packet
Service: /vanetza/btp_request
Service Type: vanetza_msgs/srv/BtpData
import rclpyfrom rclpy.node import Nodefrom vanetza_msgs.msg import TrafficClassfrom vanetza_msgs.srv import BtpData
class BtpSender(Node): def __init__(self): super().__init__("btp_sender") self.client = self.create_client(BtpData, "/vanetza/btp_request") self.create_timer(1.0, self.trigger)
def trigger(self): req = BtpData.Request() req.btp_type = BtpData.Request.BTP_TYPE_NON_INTERACTIVE req.transport_type = BtpData.Request.TRANSPORT_TYPE_SHB req.destination_port = 42 req.traffic_class.id = TrafficClass.TC_OTHER req.data = b"Your payload"
future = self.client.call_async(req) future.add_done_callback(self.done)
def done(self, future): result = future.result() status = "accepted" if result.confirm == BtpData.Response.CONFIRM_ACCEPTED else "failed" self.get_logger().info(f"BTP request {status}")
def main(args=None): rclpy.init(args=args) rclpy.spin(BtpSender()) rclpy.shutdown()CAM – Cooperative Awareness Messages
CAMs represent continuous situational awareness broadcasts that describe a station’s dynamic state (position, speed, heading, vehicle type, etc.). cube:its automatically handles the full CAM pipeline for you:
- Generates CAMs based on GNSS/IMU and motion data
- Encodes CAMs using ASN.1 UPER
Custom CAM Dissemination Rules (Plugins)
cube:its includes a modular Cooperative Awareness (CA) service, which you can extend through custom dissemination rule plugins. This allows developers to implement their own CAM triggering logic — beyond the standard ETSI rules — while still leveraging the full Vanetza + cube:its stack.
Typical use cases:
- Researching new CAM adaptation strategies
- Reducing channel load in dense traffic
- Domain-specific trigger rules for VRUs, robots, or special vehicles
We provide a reference plugin implementing ETSI-conformant CAM rate adaptation, available here:
👉 https://github.com/cubesys-GmbH/ros_cube_ca_plugins
Developers can use the reference implementation as a template and create custom logic with minimal boilerplate.
Receive CAMs
Topic: /its/cam_received
Message: etsi_its_cam_msgs/msg/CAM
import rclpyfrom rclpy.node import Nodefrom etsi_its_cam_msgs.msg import CAM
class CamListener(Node): def __init__(self): super().__init__("cam_listener") self.subscription = self.create_subscription( CAM, "/its/cam_received", self.callback, 10 )
def callback(self, msg): self.get_logger().info(f"Received CAM from StationId={msg.header.station_id.value}")
def main(args=None): rclpy.init(args=args) rclpy.spin(CamListener()) rclpy.shutdown()CPM – Collective Perception Messages
CPM enables sharing of detected objects between cooperative participants (vehicles, RSUs, robots). cube:its fully handles ASN.1 encoding, headers, and GeoNetworking.
You only need to publish CPM objects on ROS topics.
Provide CPMs
Publisher: /its/cpm_provided
Message: etsi_its_cpm_ts_msgs/msg/CollectivePerceptionMessage
Helper: /its/position_vector for ego GNSS
Example: Publish one perceived object every second.
import rclpyimport etsi_its_cpm_ts_msgs.msg as cpm_msgfrom rclpy.node import Nodefrom vanetza_msgs.msg import PositionVector
class CpmProvider(Node): def __init__(self): super().__init__("cpm_provider") self.position_vector = None self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1)
self.pub = self.create_publisher(cpm_msg.CollectivePerceptionMessage, "/its/cpm_provided", 1) self.create_timer(1.0, self.publish)
def position_update(self, msg): self.position_vector = msg
def generate_cpm(self): # minimal example with one object cpm = cpm_msg.CollectivePerceptionMessage() cpm.payload.management_container.reference_position.latitude.value = int(self.position_vector.latitude * 1e7) cpm.payload.management_container.reference_position.longitude.value = int(self.position_vector.longitude * 1e7) return cpm
def publish(self): if self.position_vector: self.pub.publish(self.generate_cpm()) self.get_logger().info("CPM provided")
def main(args=None): rclpy.init(args=args) rclpy.spin(CpmProvider()) rclpy.shutdown()DENM – Decentralized Environmental Notification Messages
DENMs are event-driven hazard/warning messages (accidents, road works, obstacles). cube:its provides a dedicated DEN service (/its/den_request) that:
- fills ITS PDU headers automatically
- assigns Action IDs
- handles updates & terminations
- performs all ASN.1 encoding
You only provide the DENM payload.
Send & Receive DENMs
Receive Topic: /its/denm_received
Transmit Service: /its/den_request
Service Type: cube_den_msgs/srv/Transmission
import rclpyimport etsi_its_denm_msgs.msg as denm_msgfrom cube_den_msgs.srv import Transmissionfrom vanetza_msgs.msg import PositionVector, GeoNetAreafrom rclpy.node import Node
class DenmNode(Node): def __init__(self): super().__init__("denm_node") self.position_vector = None self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1) self.create_subscription(denm_msg.DENM, "/its/denm_received", self.receive_callback, 1) self.client = self.create_client(Transmission, "/its/den_request") self.create_timer(1.0, self.transmit)
def receive_callback(self, msg): self.get_logger().info(f"Received DENM from StationId={msg.header.station_id.value}")
def position_update(self, msg): self.position_vector = msg
def transmit(self): if not self.position_vector: return
req = Transmission.Request() req.request = Transmission.Request.REQUEST_TRIGGER req.message = denm_msg.DENM() req.destination_area.type = GeoNetArea.TYPE_CIRCLE req.destination_area.latitude = self.position_vector.latitude req.destination_area.longitude = self.position_vector.longitude req.destination_area.distance_a = 500.0
future = self.client.call_async(req) future.add_done_callback(lambda f: self.get_logger().info("DENM sent"))
def main(args=None): rclpy.init(args=args) rclpy.spin(DenmNode()) rclpy.shutdown()VAM – Vulnerable Road User Awareness
VAMs are specialized messages for pedestrians, cyclists, e-scooters, etc. cube:its exposes a simple topic where VAMs can be provided, and automatically transmits them per ETSI rules.
Provide VAMs
Publisher: /its/vam_provided
Message: etsi_its_vam_ts_msgs/msg/VAM
import rclpyimport etsi_its_vam_ts_msgs.msg as vam_msgfrom rclpy.node import Nodefrom vanetza_msgs.msg import PositionVector
class VamProvider(Node): def __init__(self): super().__init__("vam_provider") self.pos = None self.create_subscription(PositionVector, "/its/position_vector", self.update_pos, 1)
self.pub = self.create_publisher(vam_msg.VAM, "/its/vam_provided", 1) self.create_timer(1.0, self.publish)
def update_pos(self, msg): self.pos = msg
def publish(self): if not self.pos: return vam = vam_msg.VAM() vam.vam.vam_parameters.basic_container.station_type.value = vam_msg.TrafficParticipantType.PEDESTRIAN vam.vam.vam_parameters.basic_container.reference_position.latitude.value = int(self.pos.latitude * 1e7) vam.vam.vam_parameters.basic_container.reference_position.longitude.value = int(self.pos.longitude * 1e7) self.pub.publish(vam) self.get_logger().info("VAM provided")
def main(args=None): rclpy.init(args=args) rclpy.spin(VamProvider()) rclpy.shutdown()
