Python Example
Python RPC Client – Getting Started
This section shows how to write a minimal Python client for cube-radio-rpc using pycapnp. We’ll build it up step by step:
- Create a virtual environment and install dependencies
- Connect to
cube-radio-rpcand read device info - Transmit a simple test frame
- Subscribe and receive frames
1. Setup: Virtual Environment and pycapnp
On your host machine, create and activate a Python virtual environment and install pycapnp:
# create and enter project foldermkdir cube-rpc-python && cd cube-rpc-python
# create venvpython3 -m venv .venv
# activate venv (bash/zsh)source .venv/bin/activate
# install pycapnppip install pycapnpMake sure the Cube RPC Cap’n Proto schema (cube_rpc.capnp) is available in your source folder.
2. Minimal Example – Connect and Identify
Create a file identify.py:
import asyncioimport capnp
# Make sure Cap'n Proto does not try to auto-import modulescapnp.remove_import_hook()
# Load the Cube RPC schemacube_rpc = capnp.load( 'cube_rpc.capnp', 'Cube RPC', ['/usr/include/', '/usr/local/include/'])
async def main(): host = '10.8.0.101' # IP of your cube:evk port = 23057 # default cube-radio-rpc port
print(f"Connecting to {host}:{port}...") # Open TCP connection stream = await capnp.AsyncIoStream.create_connection(host=host, port=port)
# Create RPC client and get LinkLayer interface client = capnp.TwoPartyClient(stream) link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
# Call identify() result = await link_layer.identify() print("Connected to LinkLayer:") print(f" id: {result.id}") print(f" version: {result.version}") print(f" info: {result.info}")
if __name__ == '__main__': asyncio.run(capnp.run(main()))Run it:
python identify.pyYou should see output similar to:
Connecting to 10.8.0.101:23057...Connected to LinkLayer: id: 0 version: 1 info: cube-radio=dsrc3. Sending a Simple Test Frame
Next, we add a transmit step: send a small payload over the currently active radio. Create tx_simple.py:
import asyncioimport capnp
capnp.remove_import_hook()cube_rpc = capnp.load( 'cube_rpc.capnp', 'Cube RPC', ['/usr/include/', '/usr/local/include/'])
async def main(): host = '10.8.0.101' port = 23057
print(f"Connecting to {host}:{port}...") stream = await capnp.AsyncIoStream.create_connection(host=host, port=port) client = capnp.TwoPartyClient(stream) link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
# Show device info ident = await link_layer.identify() print(f"Connected: id={ident.id}, version={ident.version}, info={ident.info}")
# Prepare a simple broadcast frame frame = { "sourceAddress": bytes.fromhex("001122334455"), "destinationAddress": bytes.fromhex("FFFFFFFFFFFF"), # broadcast "payload": b"Hello from Python RPC!" }
# Example for ITS-G5 / WLAN parameters: tx_params = { "wlan": { "priority": 3, # user priority 0–7 "power": int(10 * 8), # 10 dBm, scaled by 8 "datarate": 6 * 2 # 6 Mbps, scaled by 2 (500 kbps steps) } }
print("Transmitting frame...") result = await link_layer.transmitData(frame=frame, txParams=tx_params) print(" error: ", result.error) print(" message:", result.message)
if __name__ == '__main__': asyncio.run(capnp.run(main()))Run it:
python tx_simple.pyIf everything is configured correctly, the frame will be sent over the active radio interface. You may use another cube:evk with its cube-radio-receive tool to verify this over-the-air transmission.
4. Receiving Frames (Subscribe)
To receive frames from the radio, we need to implement a DataListener and subscribe to it. Create rx_simple.py:
import asyncioimport capnp
capnp.remove_import_hook()cube_rpc = capnp.load( 'cube_rpc.capnp', 'Cube RPC', ['/usr/include/', '/usr/local/include/'])
class DataListenerImpl(cube_rpc.LinkLayer.DataListener.Server): async def onDataIndication(self, frame, rxParams, **kwargs): """Called whenever a frame is received.""" payload = bytes(frame.payload) print("== Received frame ==") print(f" src: {frame.sourceAddress.hex()}") print(f" dest: {frame.destinationAddress.hex()}") print(f" len: {len(payload)} bytes") if rxParams.which() == 'wlan': power_dbm = rxParams.wlan.power / 8.0 # Convert from dBm × 8 print(f" power: {power_dbm:.1f} dBm") print(f" datarate: {rxParams.wlan.datarate * 0.5} Mbps") elif rxParams.which() == 'cv2x': power_dbm = rxParams.cv2x.power / 8.0 # Convert from dBm × 8 print(f" power: {power_dbm:.1f} dBm") print(f" priority (PPPP): {rxParams.cv2x.priority}") else: print(" power: (unspecified)")
try: text = payload.decode('utf-8', errors='replace') print(f" payload (utf-8): {text}") except Exception: print(" payload not decodable as UTF-8")
print()
async def main(): host = '10.8.0.101' port = 23057
print(f"Connecting to {host}:{port}...") stream = await capnp.AsyncIoStream.create_connection(host=host, port=port) client = capnp.TwoPartyClient(stream) link_layer = client.bootstrap().cast_as(cube_rpc.LinkLayer)
ident = await link_layer.identify() print(f"Connected: id={ident.id}, version={ident.version}, info={ident.info}")
# Create listener and subscribe listener = DataListenerImpl() await link_layer.subscribeData(listener=listener) print("Subscribed to data indications. Waiting for frames...")
# Keep the program running await asyncio.Event().wait()
if __name__ == '__main__': asyncio.run(capnp.run(main()))Run it:
python rx_simple.pyThen run a transmitting tool (e.g. your tx_simple.py or cube-radio-transmit) on second device to send frames. Every received frame will be printed with source, destination, and payload.

