Knowledge Base

Everything you need to know and understand to develop V2X applications.

denm-generator.py
msg = self.generate_denm()
future = self.send_request(msg)
future.add_done_callback(self.request_completed)

Facilities in cube:its

cube:its provides a complete ROS-native implementation of ETSI C-ITS facilities on top of Vanetza. This means your ROS 2 application can send and receive CAM, DENM, CPM, VAM, or low-level BTP packets simply by publishing/subscribing to topics — without ever touching ASN.1, BTP, GeoNetworking, or any low-level radio APIs.

Each facility is exposed through a clean set of ROS 2 topics and services using message definitions from:

  • vanetza_msgs
  • etsi_its_cam_msgs, etsi_its_denm_msgs, etsi_its_cpm_ts_msgs, etsi_its_vam_ts_msgs
  • cube_den_msgs (for the DEN service)

The following sections show simple, minimal examples you can use as templates.


Running the Example Nodes

All code snippets in this chapter are a modifed version taken from the v2x_apps ROS 2 package in our example repository. To run any of these examples on your development machine, you need to:

  1. Build and source the workspace (in your dev container or host):
host:~$ cd ~/cube-its/dev_ws
host:~/cube-its/dev_ws$ colcon build
host:~/cube-its/dev_ws$ source install/setup.bash
  1. Make sure cube:its is running on the cube (and reachable via DDS), e.g.:
cube:~$ start-its --remote
  1. Start an example node using ros2 run:
host:~/cube-its/dev_ws$ ros2 run v2x_apps <python_node>

Replace <python_node> with the actual node name from the examples, for instance:

host:~/cube-its/dev_ws$ ros2 run v2x_apps btp_listener
host:~/cube-its/dev_ws$ ros2 run v2x_apps btp_sender
host:~/cube-its/dev_ws$ ros2 run v2x_apps ...

Once the node is running, you should see log output in the terminal (e.g. received CAMs, DENMs, CPMs, etc.), and the corresponding /its/... or /vanetza/... topics will appear in ros2 topic list.


BTP – Basic Transport Protocol

BTP is the transport layer beneath CAM, DENM, CPM, VAM and all ETSI ITS facilities. cube:its exposes BTP directly so you can implement custom ETSI facilities or proprietary message types.


Listen for BTP Packets

Topic: /vanetza/btp_indication

Message: vanetza_msgs/msg/BtpDataIndication

import rclpy
from rclpy.node import Node
from vanetza_msgs.msg import BtpDataIndication
class BtpListener(Node):
def __init__(self):
super().__init__("btp_listener")
self.subscription = self.create_subscription(
BtpDataIndication,
"/vanetza/btp_indication",
self.listener_callback,
10,
)
def listener_callback(self, msg):
self.get_logger().info(
f"Received {len(msg.data)} bytes on BTP port {msg.destination_port}"
)
def main(args=None):
rclpy.init(args=args)
rclpy.spin(BtpListener())
rclpy.shutdown()

Send a BTP Packet

Service: /vanetza/btp_request

Service Type: vanetza_msgs/srv/BtpData

import rclpy
from rclpy.node import Node
from vanetza_msgs.msg import TrafficClass
from vanetza_msgs.srv import BtpData
class BtpSender(Node):
def __init__(self):
super().__init__("btp_sender")
self.client = self.create_client(BtpData, "/vanetza/btp_request")
self.create_timer(1.0, self.trigger)
def trigger(self):
req = BtpData.Request()
req.btp_type = BtpData.Request.BTP_TYPE_NON_INTERACTIVE
req.transport_type = BtpData.Request.TRANSPORT_TYPE_SHB
req.destination_port = 42
req.traffic_class.id = TrafficClass.TC_OTHER
req.data = b"Your payload"
future = self.client.call_async(req)
future.add_done_callback(self.done)
def done(self, future):
result = future.result()
status = "accepted" if result.confirm == BtpData.Response.CONFIRM_ACCEPTED else "failed"
self.get_logger().info(f"BTP request {status}")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(BtpSender())
rclpy.shutdown()

CAM – Cooperative Awareness Messages

CAMs represent continuous situational awareness broadcasts that describe a station’s dynamic state (position, speed, heading, vehicle type, etc.). cube:its automatically handles the full CAM pipeline for you:

  • Generates CAMs based on GNSS/IMU and motion data
  • Encodes CAMs using ASN.1 UPER

CAM Dissemination

cube:its includes a modular Cooperative Awareness (CA) service featuring a dissemination process for CAMs with two stages:

First, the CAM provider is a dedicated node that prepares CAMs at a fixed rate including optional containers such as the LowFrequencyContainer. These CAMs are published on the /its/cam_provided topic.

Second, the CA transmission node evaluates these provided CAMs and applies filtering, which affects the effective transmission rate and presence of optional message data.
For example, the default rules for vehicles adapt the rate between 1 to 10 Hz and the LowFrequencyContainer is only included every 500ms. The actually transmitted CAMs are published by the transmission node on the /its/cam_transmitted topic.

CAM Dissemination Rules (Plugins)

The default dissemination rules implemented in our cube_ca_plugins package follow the ETSI standard:

In a nutshell, VehicleDisseminationRules is the default profile except for RSUs. It adapts the transmission rate dynamically between 0.1 and 1.0 second depending on the station's recent changes in heading, speed and position. Furthermore, it includes LowFrequencyContainer and SpecialVehicleContainer only once every 0.5 seconds.

If you set the station type of cube:its to "road side unit", the RsuDisseminationRules apply automatically. These rules simply limit the transmission rate to 1 Hz.

Additionally, developers can implement their custom CAM triggering logic — beyond the standard ETSI rules — while still leveraging the full Vanetza + cube:its stack with this plugin mechanism.

Typical use cases:

  • Researching new CAM adaptation strategies
  • Reducing channel load in dense traffic
  • Domain-specific trigger rules for VRUs, robots, or special vehicles

For details, please consult our cube_ca_plugins package which contains the code of our ETSI-conformant CAM rate adaptation plugins. This package also includes sample code for a custom dissemination rules plugin.

👉 https://github.com/cubesys-GmbH/ros_cube_ca_plugins

Developers can use the reference implementation as a template and create custom logic with minimal boilerplate.


Receive CAMs

Topic: /its/cam_received

Message: etsi_its_cam_msgs/msg/CAM

import rclpy
from rclpy.node import Node
from etsi_its_cam_msgs.msg import CAM
class CamListener(Node):
def __init__(self):
super().__init__("cam_listener")
self.subscription = self.create_subscription(
CAM, "/its/cam_received", self.callback, 10
)
def callback(self, msg):
self.get_logger().info(f"Received CAM from StationId={msg.header.station_id.value}")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(CamListener())
rclpy.shutdown()

CPM – Collective Perception Messages

CPM enables sharing of detected objects between cooperative participants (vehicles, RSUs, robots). cube:its fully handles ASN.1 encoding, headers, and GeoNetworking.

You only need to publish CPM objects on ROS topics.


Provide CPMs

Publisher: /its/cpm_provided

Message: etsi_its_cpm_ts_msgs/msg/CollectivePerceptionMessage

Helper: /its/position_vector for ego GNSS

Example: Publish one perceived object every second.

import rclpy
import etsi_its_cpm_ts_msgs.msg as cpm_msg
from rclpy.node import Node
from vanetza_msgs.msg import PositionVector
class CpmProvider(Node):
def __init__(self):
super().__init__("cpm_provider")
self.position_vector = None
self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1)
self.pub = self.create_publisher(cpm_msg.CollectivePerceptionMessage, "/its/cpm_provided", 1)
self.create_timer(1.0, self.publish)
def position_update(self, msg):
self.position_vector = msg
def generate_cpm(self):
# minimal example with one object
cpm = cpm_msg.CollectivePerceptionMessage()
cpm.payload.management_container.reference_position.latitude.value = int(self.position_vector.latitude * 1e7)
cpm.payload.management_container.reference_position.longitude.value = int(self.position_vector.longitude * 1e7)
return cpm
def publish(self):
if self.position_vector:
self.pub.publish(self.generate_cpm())
self.get_logger().info("CPM provided")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(CpmProvider())
rclpy.shutdown()

DENM – Decentralized Environmental Notification Messages

DENMs are event-driven hazard/warning messages (accidents, road works, obstacles). cube:its provides a dedicated DEN service (/its/den_request) that:

  • fills ITS PDU headers automatically
  • assigns Action IDs
  • handles updates & terminations
  • performs all ASN.1 encoding

You only provide the DENM payload.


Send & Receive DENMs

Receive Topic: /its/denm_received

Transmit Service: /its/den_request

Service Type: cube_den_msgs/srv/Transmission

import rclpy
import etsi_its_denm_msgs.msg as denm_msg
from cube_den_msgs.srv import Transmission
from vanetza_msgs.msg import PositionVector, GeoNetArea
from rclpy.node import Node
class DenmNode(Node):
def __init__(self):
super().__init__("denm_node")
self.position_vector = None
self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1)
self.create_subscription(denm_msg.DENM, "/its/denm_received", self.receive_callback, 1)
self.client = self.create_client(Transmission, "/its/den_request")
self.create_timer(1.0, self.transmit)
def receive_callback(self, msg):
self.get_logger().info(f"Received DENM from StationId={msg.header.station_id.value}")
def position_update(self, msg):
self.position_vector = msg
def transmit(self):
if not self.position_vector:
return
req = Transmission.Request()
req.request = Transmission.Request.REQUEST_TRIGGER
req.message = denm_msg.DENM()
req.destination_area.type = GeoNetArea.TYPE_CIRCLE
req.destination_area.latitude = self.position_vector.latitude
req.destination_area.longitude = self.position_vector.longitude
req.destination_area.distance_a = 500.0
future = self.client.call_async(req)
future.add_done_callback(lambda f: self.get_logger().info("DENM sent"))
def main(args=None):
rclpy.init(args=args)
rclpy.spin(DenmNode())
rclpy.shutdown()

VAM – Vulnerable Road User Awareness

VAMs are specialized messages for pedestrians, cyclists, e-scooters, etc. cube:its exposes a simple topic where VAMs can be provided, and automatically transmits them per ETSI rules.


Provide VAMs

Publisher: /its/vam_provided

Message: etsi_its_vam_ts_msgs/msg/VAM

import rclpy
import etsi_its_vam_ts_msgs.msg as vam_msg
from rclpy.node import Node
from vanetza_msgs.msg import PositionVector
class VamProvider(Node):
def __init__(self):
super().__init__("vam_provider")
self.pos = None
self.create_subscription(PositionVector, "/its/position_vector", self.update_pos, 1)
self.pub = self.create_publisher(vam_msg.VAM, "/its/vam_provided", 1)
self.create_timer(1.0, self.publish)
def update_pos(self, msg):
self.pos = msg
def publish(self):
if not self.pos:
return
vam = vam_msg.VAM()
vam.vam.vam_parameters.basic_container.station_type.value = vam_msg.TrafficParticipantType.PEDESTRIAN
vam.vam.vam_parameters.basic_container.reference_position.latitude.value = int(self.pos.latitude * 1e7)
vam.vam.vam_parameters.basic_container.reference_position.longitude.value = int(self.pos.longitude * 1e7)
self.pub.publish(vam)
self.get_logger().info("VAM provided")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(VamProvider())
rclpy.shutdown()