Knowledge Base

Everything you need to know and understand to develop V2X applications.

denm-generator.py
msg = self.generate_denm()
future = self.send_request(msg)
future.add_done_callback(self.request_completed)

Python Example

Python RPC Client – Getting Started

This section shows how to write a minimal Python client for cube-radio-rpc using pycapnp. We’ll build it up step by step:

  1. Create a virtual environment and install dependencies
  2. Connect to cube-radio-rpc and read device info
  3. Transmit a simple test frame
  4. Subscribe and receive frames

1. Setup: Virtual Environment and pycapnp

On your host machine, create and activate a Python virtual environment and install pycapnp:

# create and enter project folder
mkdir cube-rpc-python && cd cube-rpc-python
# create venv
python3 -m venv .venv
# activate venv (bash/zsh)
source .venv/bin/activate
# install pycapnp
pip install pycapnp

Make sure the Cube RPC Cap’n Proto schema (cube_rpc.capnp) is available in your source folder.


2. Minimal Example – Connect and Identify

Create a file identify.py:

import asyncio
import capnp
# Make sure Cap'n Proto does not try to auto-import modules
capnp.remove_import_hook()
# Load the Cube RPC schema
cube_rpc = capnp.load(
'cube_rpc.capnp',
'Cube RPC',
['/usr/include/', '/usr/local/include/']
)
async def main():
host = '10.8.0.101' # IP of your cube:evk
port = 23057 # default cube-radio-rpc port
print(f"Connecting to {host}:{port}...")
# Open TCP connection
stream = await capnp.AsyncIoStream.create_connection(host=host, port=port)
# Create RPC client and get LinkLayer interface
client = capnp.TwoPartyClient(stream)
link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
# Call identify()
result = await link_layer.identify()
print("Connected to LinkLayer:")
print(f" id: {result.id}")
print(f" version: {result.version}")
print(f" info: {result.info}")
if __name__ == '__main__':
asyncio.run(capnp.run(main()))

Run it:

python identify.py

You should see output similar to:

Connecting to 10.8.0.101:23057...
Connected to LinkLayer:
id: 0
version: 1
info: cube-radio=dsrc

3. Sending a Simple Test Frame

Next, we add a transmit step: send a small payload over the currently active radio. Create tx_simple.py:

import asyncio
import capnp
capnp.remove_import_hook()
cube_rpc = capnp.load(
'cube_rpc.capnp',
'Cube RPC',
['/usr/include/', '/usr/local/include/']
)
async def main():
host = '10.8.0.101'
port = 23057
print(f"Connecting to {host}:{port}...")
stream = await capnp.AsyncIoStream.create_connection(host=host, port=port)
client = capnp.TwoPartyClient(stream)
link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
# Show device info
ident = await link_layer.identify()
print(f"Connected: id={ident.id}, version={ident.version}, info={ident.info}")
# Prepare a simple broadcast frame
frame = {
"sourceAddress": bytes.fromhex("001122334455"),
"destinationAddress": bytes.fromhex("FFFFFFFFFFFF"), # broadcast
"payload": b"Hello from Python RPC!"
}
# Example for ITS-G5 / WLAN parameters:
tx_params = {
"wlan": {
"priority": 3, # user priority 0–7
"power": int(10 * 8), # 10 dBm, scaled by 8
"datarate": 6 * 2 # 6 Mbps, scaled by 2 (500 kbps steps)
}
}
print("Transmitting frame...")
result = await link_layer.transmitData(frame=frame, txParams=tx_params)
print(" error: ", result.error)
print(" message:", result.message)
if __name__ == '__main__':
asyncio.run(capnp.run(main()))

Run it:

python tx_simple.py

If everything is configured correctly, the frame will be sent over the active radio interface. You may use another cube:evk with its cube-radio-receive tool to verify this over-the-air transmission.


4. Receiving Frames (Subscribe)

To receive frames from the radio, we need to implement a DataListener and subscribe to it. Create rx_simple.py:

import asyncio
import capnp
capnp.remove_import_hook()
cube_rpc = capnp.load(
'cube_rpc.capnp',
'Cube RPC',
['/usr/include/', '/usr/local/include/']
)
class DataListenerImpl(cube_rpc.LinkLayer.DataListener.Server):
async def onDataIndication(self, frame, rxParams, **kwargs):
"""Called whenever a frame is received."""
payload = bytes(frame.payload)
print("== Received frame ==")
print(f" src: {frame.sourceAddress.hex()}")
print(f" dest: {frame.destinationAddress.hex()}")
print(f" len: {len(payload)} bytes")
if rxParams.which() == 'wlan':
power_dbm = rxParams.wlan.power / 8.0 # Convert from dBm × 8
print(f" power: {power_dbm:.1f} dBm")
print(f" datarate: {rxParams.wlan.datarate * 0.5} Mbps")
elif rxParams.which() == 'cv2x':
power_dbm = rxParams.cv2x.power / 8.0 # Convert from dBm × 8
print(f" power: {power_dbm:.1f} dBm")
print(f" priority (PPPP): {rxParams.cv2x.priority}")
else:
print(" power: (unspecified)")
try:
text = payload.decode('utf-8', errors='replace')
print(f" payload (utf-8): {text}")
except Exception:
print(" payload not decodable as UTF-8")
print()
async def main():
host = '10.8.0.101'
port = 23057
print(f"Connecting to {host}:{port}...")
stream = await capnp.AsyncIoStream.create_connection(host=host, port=port)
client = capnp.TwoPartyClient(stream)
link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
ident = await link_layer.identify()
print(f"Connected: id={ident.id}, version={ident.version}, info={ident.info}")
# Create listener and subscribe
listener = DataListenerImpl()
await link_layer.subscribeData(listener=listener)
print("Subscribed to data indications. Waiting for frames...")
# Keep the program running
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(capnp.run(main()))

Run it:

python rx_simple.py

Then run a transmitting tool (e.g. your tx_simple.py or cube-radio-transmit) on second device to send frames. Every received frame will be printed with source, destination, and payload.

Previous
The Contract