Knowledge Base

Everything you need to know and understand to develop V2X applications.

denm-generator.py
msg = self.generate_denm()
future = self.send_request(msg)
future.add_done_callback(self.request_completed)

Facilities in cube:its

cube:its provides a complete ROS-native implementation of ETSI C-ITS facilities on top of Vanetza. This means your ROS 2 application can send and receive CAM, DENM, CPM, VAM, or low-level BTP packets simply by publishing/subscribing to topics — without ever touching ASN.1, BTP, GeoNetworking, or any low-level radio APIs.

Each facility is exposed through a clean set of ROS 2 topics and services using message definitions from:

  • vanetza_msgs
  • etsi_its_cam_msgs, etsi_its_denm_msgs, etsi_its_cpm_ts_msgs, etsi_its_vam_ts_msgs
  • cube_den_msgs (for the DEN service)

The following sections show simple, minimal examples you can use as templates.


Running the Example Nodes

All code snippets in this chapter are a modifed version taken from the v2x_apps ROS 2 package in our example repository. To run any of these examples on your development machine, you need to:

  1. Build and source the workspace (in your dev container or host):
host:~$ cd ~/cube-its/dev_ws
host:~/cube-its/dev_ws$ colcon build
host:~/cube-its/dev_ws$ source install/setup.bash
  1. Make sure cube:its is running on the cube (and reachable via DDS), e.g.:
cube:~$ start-its --remote
  1. Start an example node using ros2 run:
host:~/cube-its/dev_ws$ ros2 run v2x_apps <python_node>

Replace <python_node> with the actual node name from the examples, for instance:

host:~/cube-its/dev_ws$ ros2 run v2x_apps btp_listener
host:~/cube-its/dev_ws$ ros2 run v2x_apps btp_sender
host:~/cube-its/dev_ws$ ros2 run v2x_apps ...

Once the node is running, you should see log output in the terminal (e.g. received CAMs, DENMs, CPMs, etc.), and the corresponding /its/... or /vanetza/... topics will appear in ros2 topic list.


BTP – Basic Transport Protocol

BTP is the transport layer beneath CAM, DENM, CPM, VAM and all ETSI ITS facilities. cube:its exposes BTP directly so you can implement custom ETSI facilities or proprietary message types.


Listen for BTP Packets

Topic: /vanetza/btp_indication

Message: vanetza_msgs/msg/BtpDataIndication

import rclpy
from rclpy.node import Node
from vanetza_msgs.msg import BtpDataIndication
class BtpListener(Node):
def __init__(self):
super().__init__("btp_listener")
self.subscription = self.create_subscription(
BtpDataIndication,
"/vanetza/btp_indication",
self.listener_callback,
10,
)
def listener_callback(self, msg):
self.get_logger().info(
f"Received {len(msg.data)} bytes on BTP port {msg.destination_port}"
)
def main(args=None):
rclpy.init(args=args)
rclpy.spin(BtpListener())
rclpy.shutdown()

Send a BTP Packet

Service: /vanetza/btp_request

Service Type: vanetza_msgs/srv/BtpData

import rclpy
from rclpy.node import Node
from vanetza_msgs.msg import TrafficClass
from vanetza_msgs.srv import BtpData
class BtpSender(Node):
def __init__(self):
super().__init__("btp_sender")
self.client = self.create_client(BtpData, "/vanetza/btp_request")
self.create_timer(1.0, self.trigger)
def trigger(self):
req = BtpData.Request()
req.btp_type = BtpData.Request.BTP_TYPE_NON_INTERACTIVE
req.transport_type = BtpData.Request.TRANSPORT_TYPE_SHB
req.destination_port = 42
req.traffic_class.id = TrafficClass.TC_OTHER
req.data = b"Your payload"
future = self.client.call_async(req)
future.add_done_callback(self.done)
def done(self, future):
result = future.result()
status = "accepted" if result.confirm == BtpData.Response.CONFIRM_ACCEPTED else "failed"
self.get_logger().info(f"BTP request {status}")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(BtpSender())
rclpy.shutdown()

CAM – Cooperative Awareness Messages

CAMs represent continuous situational awareness broadcasts that describe a station’s dynamic state (position, speed, heading, vehicle type, etc.). cube:its automatically handles the full CAM pipeline for you:

  • Generates CAMs based on GNSS/IMU and motion data
  • Encodes CAMs using ASN.1 UPER

Custom CAM Dissemination Rules (Plugins)

cube:its includes a modular Cooperative Awareness (CA) service, which you can extend through custom dissemination rule plugins. This allows developers to implement their own CAM triggering logic — beyond the standard ETSI rules — while still leveraging the full Vanetza + cube:its stack.

Typical use cases:

  • Researching new CAM adaptation strategies
  • Reducing channel load in dense traffic
  • Domain-specific trigger rules for VRUs, robots, or special vehicles

We provide a reference plugin implementing ETSI-conformant CAM rate adaptation, available here:

👉 https://github.com/cubesys-GmbH/ros_cube_ca_plugins

Developers can use the reference implementation as a template and create custom logic with minimal boilerplate.


Receive CAMs

Topic: /its/cam_received

Message: etsi_its_cam_msgs/msg/CAM

import rclpy
from rclpy.node import Node
from etsi_its_cam_msgs.msg import CAM
class CamListener(Node):
def __init__(self):
super().__init__("cam_listener")
self.subscription = self.create_subscription(
CAM, "/its/cam_received", self.callback, 10
)
def callback(self, msg):
self.get_logger().info(f"Received CAM from StationId={msg.header.station_id.value}")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(CamListener())
rclpy.shutdown()

CPM – Collective Perception Messages

CPM enables sharing of detected objects between cooperative participants (vehicles, RSUs, robots). cube:its fully handles ASN.1 encoding, headers, and GeoNetworking.

You only need to publish CPM objects on ROS topics.


Provide CPMs

Publisher: /its/cpm_provided

Message: etsi_its_cpm_ts_msgs/msg/CollectivePerceptionMessage

Helper: /its/position_vector for ego GNSS

Example: Publish one perceived object every second.

import rclpy
import etsi_its_cpm_ts_msgs.msg as cpm_msg
from rclpy.node import Node
from vanetza_msgs.msg import PositionVector
class CpmProvider(Node):
def __init__(self):
super().__init__("cpm_provider")
self.position_vector = None
self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1)
self.pub = self.create_publisher(cpm_msg.CollectivePerceptionMessage, "/its/cpm_provided", 1)
self.create_timer(1.0, self.publish)
def position_update(self, msg):
self.position_vector = msg
def generate_cpm(self):
# minimal example with one object
cpm = cpm_msg.CollectivePerceptionMessage()
cpm.payload.management_container.reference_position.latitude.value = int(self.position_vector.latitude * 1e7)
cpm.payload.management_container.reference_position.longitude.value = int(self.position_vector.longitude * 1e7)
return cpm
def publish(self):
if self.position_vector:
self.pub.publish(self.generate_cpm())
self.get_logger().info("CPM provided")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(CpmProvider())
rclpy.shutdown()

DENM – Decentralized Environmental Notification Messages

DENMs are event-driven hazard/warning messages (accidents, road works, obstacles). cube:its provides a dedicated DEN service (/its/den_request) that:

  • fills ITS PDU headers automatically
  • assigns Action IDs
  • handles updates & terminations
  • performs all ASN.1 encoding

You only provide the DENM payload.


Send & Receive DENMs

Receive Topic: /its/denm_received

Transmit Service: /its/den_request

Service Type: cube_den_msgs/srv/Transmission

import rclpy
import etsi_its_denm_msgs.msg as denm_msg
from cube_den_msgs.srv import Transmission
from vanetza_msgs.msg import PositionVector, GeoNetArea
from rclpy.node import Node
class DenmNode(Node):
def __init__(self):
super().__init__("denm_node")
self.position_vector = None
self.create_subscription(PositionVector, "/its/position_vector", self.position_update, 1)
self.create_subscription(denm_msg.DENM, "/its/denm_received", self.receive_callback, 1)
self.client = self.create_client(Transmission, "/its/den_request")
self.create_timer(1.0, self.transmit)
def receive_callback(self, msg):
self.get_logger().info(f"Received DENM from StationId={msg.header.station_id.value}")
def position_update(self, msg):
self.position_vector = msg
def transmit(self):
if not self.position_vector:
return
req = Transmission.Request()
req.request = Transmission.Request.REQUEST_TRIGGER
req.message = denm_msg.DENM()
req.destination_area.type = GeoNetArea.TYPE_CIRCLE
req.destination_area.latitude = self.position_vector.latitude
req.destination_area.longitude = self.position_vector.longitude
req.destination_area.distance_a = 500.0
future = self.client.call_async(req)
future.add_done_callback(lambda f: self.get_logger().info("DENM sent"))
def main(args=None):
rclpy.init(args=args)
rclpy.spin(DenmNode())
rclpy.shutdown()

VAM – Vulnerable Road User Awareness

VAMs are specialized messages for pedestrians, cyclists, e-scooters, etc. cube:its exposes a simple topic where VAMs can be provided, and automatically transmits them per ETSI rules.


Provide VAMs

Publisher: /its/vam_provided

Message: etsi_its_vam_ts_msgs/msg/VAM

import rclpy
import etsi_its_vam_ts_msgs.msg as vam_msg
from rclpy.node import Node
from vanetza_msgs.msg import PositionVector
class VamProvider(Node):
def __init__(self):
super().__init__("vam_provider")
self.pos = None
self.create_subscription(PositionVector, "/its/position_vector", self.update_pos, 1)
self.pub = self.create_publisher(vam_msg.VAM, "/its/vam_provided", 1)
self.create_timer(1.0, self.publish)
def update_pos(self, msg):
self.pos = msg
def publish(self):
if not self.pos:
return
vam = vam_msg.VAM()
vam.vam.vam_parameters.basic_container.station_type.value = vam_msg.TrafficParticipantType.PEDESTRIAN
vam.vam.vam_parameters.basic_container.reference_position.latitude.value = int(self.pos.latitude * 1e7)
vam.vam.vam_parameters.basic_container.reference_position.longitude.value = int(self.pos.longitude * 1e7)
self.pub.publish(vam)
self.get_logger().info("VAM provided")
def main(args=None):
rclpy.init(args=args)
rclpy.spin(VamProvider())
rclpy.shutdown()