Issue #269: Feature: Please add BPIO2 FlatBuffers support for UART mode

Feature: Please add BPIO2 FlatBuffers support for UART mode:

Summary

UART mode is listed in modes_available from the status response, but attempting to configure UART via BPIO2 returns “Invalid mode name” because no BPIO handler exists for UART.

Current State

bpio_mode_handlers[] in src/binmode/bpio.c:75-88 only has entries for:

  • HW1WIREbpio_1wire_configure + bpio_hw1wire_transaction
  • HWI2Cbpio_hwi2c_configure + bpio_hwi2c_transaction
  • HWSPIbpio_hwspi_configure + bpio_hwspi_transaction

Missing:

  • No HWUART entry exists in the handlers array
  • bpio_transactions.c has no bpio_hwuart_transaction() function
  • hwuart.h has no bpio_hwuart_configure() declaration

What’s Needed

  1. Add bpio_hwuart_configure() function in src/mode/hwuart.c (similar to bpio_hwspi_configure)
  2. Add bpio_hwuart_transaction() function in src/binmode/bpio_transactions.c
  3. Add [HWUART] entry to bpio_mode_handlers[] array in src/binmode/bpio.c

Configuration Parameters

Already defined in bpio.fbs schema:

Parameter Type Default Description
speed uint32 20000 Baud rate in Hz
data_bits uint8 8 Data bits (5-8)
parity bool false false=None, true=Even
stop_bits uint8 1 Stop bits (1 or 2)
flow_control bool false Hardware flow control
signal_inversion bool false Invert TX/RX signals

Test Case

from pybpio import BPIOClient

client = BPIOClient('/dev/cu.usbmodem6buspirate3', baudrate=3000000)

# Check available modes - UART is listed
status = client.status_request()
print(status['modes_available'])
# ['HiZ', '1WIRE', 'UART', 'HDUART', 'I2C', 'SPI', ...]

# This currently fails with "Configuration error: Invalid mode name"
result = client.configuration_request(
    mode='UART',
    mode_configuration={
        'speed': 115200,
        'data_bits': 8,
        'parity': False,
        'stop_bits': 1
    }
)

Expected Behavior

After implementation, switching to UART mode via BPIO2 should work the same as SPI/I2C:

# Should succeed and allow UART data transfers
client.configuration_request(mode='UART', mode_configuration={'speed': 115200})

# Write data to TX
client.data_request(data_write=[0x48, 0x65, 0x6C, 0x6C, 0x6F])  # "Hello"

# Read data from RX
response = client.data_request(bytes_read=10)

References

Issue opened by: tc-mgriff

Thank you for the very well researched and written issue :slight_smile:

The reason I delayed adding any asynchronous modes is because I’m not sure the best way to handle them.

Writing data is obviously pretty straight forward, but what about reading data?

  • What happens if you request 10 bytes read and only 9 show up? Timeout? Hang and wait for more?
  • What happens when data arrives asynchronously? Store it until the next read? How big a buffer to use to store async data? What happens on buffer overflow?
  • Do we need flow control? How is that handled? Repeat the RTS/CTS from the USB CDC interface?

In the forum we discussed adding a new “asynch” table to the schema. This would be a new table that carries data as it arrives.

However, in flatbuffers I guess it is always better to extend existing tables because empty fields don’t take up any bytes. Adding tables to the scheme does make everything heavier. So perhaps a better approach is to add an async indication field to the existing data table.

A totally different approach would be to use the USB CDC flow control line to put BPIO UART mode into a transparent usb to serial bridge. Then drop that signal to send normal flatbuffer tables again.

Happy to discuss this more in the auto-post for this issue in forum (or here).

Bot-copy from GitHub comment by: DangerousPrototypes

Sorry for the account switch-a-roo, I was still logged in from work after a long week when I created the issues.

It’s hard to say the best option, but I’ll give you the use cases I was thinking of.

1. Automating iterating through unknown UART connection settings
2. Trigging glitching based BP glitching settings
3. Trigging glitching settings then interacting with target after successful glitch

It feels like a lot of work to implement everything that comes with this through flatbuffers, but I think using USB CDC for trying to iterate through different UART connection settings would be awkward?

1 Like

Hey welcome!

  1. Will definitely require a way to clear the read, as wrong speed and settings often sends a heap of garbage data
  2. Currently there is no way to access high level terminal “commands” from BPIO. It’s possible but it needs some careful thought to expand it system wide. Generally the idea is to provide the low level bits so the PC software can basically duplicate what’s done in the command. There is this weird split now on low level stuff (bpio) and the fact that you can just dump a flash or eeprom from the terminal. This is a whole discussion I guess.
  3. So the bus pirate has multiple bin modes, and maybe some of the glitch specific stuff could go into a new purpose build binmode? I know one tool to rule them all, but then you sacrifice easy and fast at the same time.

This is definitely something we need to address, and I’m up for the conversation.

1 Like

I plan to start prototyping something tomorrow or this weekend.

Moving forward. Cleaned up some partially finished implementations of 3wire, 2wire, LED. Did a huge reorg of the transactions. Refactored the LED mode into function pointer arrays.

Starting on the update of the data response packet to provide async data access. I asked the LLM to describe some ways to handle the async packets in python. Python isn’t my thing, and I don’t want to build something that ends up being unusable on the python side.

Some ways to deal with async in python

BPIO Async Data - Python Client Implementation Patterns

Overview

BPIO supports asynchronous data from modes like UART and 2WIRE. Async packets have is_async=true flag in DataResponse to distinguish them from normal request/response transactions.


Recommended: Threading with Queue Pattern

Implementation

import serial
import threading
from queue import Queue, Empty
import time

class BusPirate:
    def __init__(self, port='/dev/ttyACM0', baudrate=115200):
        self.serial = serial.Serial(port, baudrate=baudrate, timeout=None)
        self.async_queue = Queue()
        self.response_queue = Queue()
        self._running = True
        
        # Start background reader thread
        self.reader_thread = threading.Thread(
            target=self._reader_loop, 
            daemon=True,
            name='BusPirate-Reader'
        )
        self.reader_thread.start()
    
    def _reader_loop(self):
        """Background thread continuously reads packets"""
        while self._running:
            try:
                packet = self._read_cobs_packet()  # Blocks on serial.read()
                
                if packet.is_async:
                    self.async_queue.put(packet)
                else:
                    self.response_queue.put(packet)
                    
            except Exception as e:
                print(f"Reader error: {e}")
                break
    
    def _read_cobs_packet(self):
        """Read COBS-encoded packet from serial (blocks until 0x00 delimiter)"""
        buffer = bytearray()
        while True:
            byte = self.serial.read(1)  # Efficient blocking read
            if byte == b'\x00':
                # Decode COBS and parse FlatBuffer
                decoded = cobs_decode(buffer)
                return parse_flatbuffer(decoded)
            buffer.extend(byte)
    
    def transact(self, request, timeout=1.0):
        """Send request and wait for response"""
        # Clear any stale responses
        while not self.response_queue.empty():
            self.response_queue.get_nowait()
        
        # Send request
        encoded = encode_request(request)
        self.serial.write(encoded)
        
        # Wait for response
        try:
            return self.response_queue.get(timeout=timeout)
        except Empty:
            raise TimeoutError(f"No response within {timeout}s")
    
    def poll_async(self, timeout=0):
        """Check for async data (non-blocking by default)"""
        try:
            return self.async_queue.get(timeout=timeout)
        except Empty:
            return None
    
    def on_async(self, callback):
        """Register callback for async data (advanced usage)"""
        def wrapper():
            while self._running:
                packet = self.async_queue.get()  # Blocks until data
                callback(packet)
        
        threading.Thread(target=wrapper, daemon=True).start()
    
    def close(self):
        """Cleanup"""
        self._running = False
        self.serial.close()
        self.reader_thread.join(timeout=1.0)

# Helper functions (to be implemented with actual COBS/FlatBuffer code)
def cobs_decode(data):
    """COBS decode implementation"""
    pass

def encode_request(request):
    """Encode FlatBuffer request with COBS + 0x00 delimiter"""
    pass

def parse_flatbuffer(data):
    """Parse FlatBuffer DataResponse/ConfigurationResponse/etc"""
    pass

Usage Examples

Basic: Poll for Async Data

bp = BusPirate('/dev/ttyACM0')

# Configure UART mode
bp.configure_mode('UART', baudrate=115200)

# Normal SPI transaction
response = bp.spi_write([0x01, 0x02, 0x03])
print(f"SPI response: {response.data_read}")

# Poll for UART async data
while True:
    async_data = bp.poll_async()
    if async_data:
        print(f"UART RX: {bytes(async_data.data_read)}")
    time.sleep(0.01)  # 10ms poll interval

Advanced: Callback-Driven

bp = BusPirate('/dev/ttyACM0')

# Register async handler
def uart_handler(packet):
    data = bytes(packet.data_read)
    print(f"UART RX [{len(data)} bytes]: {data.hex()}")

bp.on_async(uart_handler)

# Configure UART
bp.configure_mode('UART', baudrate=9600)

# Continue with normal operations - async data handled in background
while True:
    response = bp.spi_write([0xAA, 0xBB])
    time.sleep(1)

Mixed: UART Transmit + Async Receive

bp = BusPirate('/dev/ttyACM0')
bp.configure_mode('UART', baudrate=115200)

# Send UART data
bp.uart_write(b"AT\r\n")

# Wait for async response
timeout = time.time() + 5.0
response_data = bytearray()

while time.time() < timeout:
    packet = bp.poll_async(timeout=0.1)
    if packet:
        response_data.extend(packet.data_read)
        if b'\r\n' in response_data:
            print(f"Response: {response_data.decode()}")
            break

Alternative Patterns

Option 2: asyncio (Modern Python 3.7+)

import asyncio
import serial_asyncio

class BusPirateAsync:
    def __init__(self):
        self.transport = None
        self.protocol = None
        self.async_callbacks = []
        self.pending_response = None
    
    async def connect(self, port='/dev/ttyACM0'):
        loop = asyncio.get_event_loop()
        self.transport, self.protocol = await serial_asyncio.create_serial_connection(
            loop, BusPirateProtocol, port, baudrate=115200
        )
        self.protocol.async_callbacks = self.async_callbacks
    
    async def transact(self, request):
        self.protocol.pending_response = asyncio.Future()
        self.transport.write(encode_request(request))
        return await asyncio.wait_for(self.protocol.pending_response, timeout=1.0)
    
    def on_async(self, callback):
        self.async_callbacks.append(callback)

class BusPirateProtocol(asyncio.Protocol):
    def __init__(self):
        self.buffer = bytearray()
        self.async_callbacks = []
        self.pending_response = None
    
    def data_received(self, data):
        self.buffer.extend(data)
        
        while b'\x00' in self.buffer:
            idx = self.buffer.index(b'\x00')
            packet_bytes = bytes(self.buffer[:idx])
            self.buffer = self.buffer[idx+1:]
            
            packet = parse_cobs_flatbuffer(packet_bytes)
            
            if packet.is_async:
                for callback in self.async_callbacks:
                    callback(packet)
            else:
                if self.pending_response and not self.pending_response.done():
                    self.pending_response.set_result(packet)

# Usage
async def main():
    bp = BusPirateAsync()
    await bp.connect('/dev/ttyACM0')
    
    bp.on_async(lambda p: print(f"Async: {bytes(p.data_read)}"))
    
    response = await bp.transact(spi_request)
    print(f"Response: {response.data_read}")

asyncio.run(main())

Option 3: Event-Driven with select()

import serial
import select
import threading
from collections import deque

class BusPirateEventDriven:
    def __init__(self, port='/dev/ttyACM0'):
        self.serial = serial.Serial(port, timeout=0)  # Non-blocking
        self.async_callbacks = []
        self.response_queue = deque()
        self._running = True
        threading.Thread(target=self._event_loop, daemon=True).start()
    
    def _event_loop(self):
        """Efficient event loop using select()"""
        while self._running:
            # Wait for data (blocks efficiently, no CPU waste)
            ready, _, _ = select.select([self.serial.fileno()], [], [], 0.1)
            
            if ready:
                packet = self._read_packet()
                if packet.is_async:
                    for callback in self.async_callbacks:
                        callback(packet)
                else:
                    self.response_queue.append(packet)
    
    def on_async(self, callback):
        self.async_callbacks.append(callback)

Performance Notes

Why Threading Works Well

  1. COBS Framing: 0x00 delimiter means reader thread naturally blocks until complete packet
  2. OS-Level Efficiency: serial.read() blocks in kernel - zero CPU usage while waiting
  3. Queue Overhead: Minimal - Python’s Queue is implemented in C
  4. GIL Impact: Irrelevant - reader thread blocks on I/O, not compute

Typical Latency

  • Normal response: <1ms (request → response)
  • Async delivery: <5ms (hardware RX → Python callback)
  • Queue overhead: <100μs

Memory Usage

  • Per packet: ~512 bytes (max BPIO packet size)
  • Queue depth: Auto-grows, typical <10 packets buffered
  • Total overhead: <50KB for typical usage

Debugging Tips

Enable Debug Logging

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('BusPirate')

class BusPirate:
    def _reader_loop(self):
        while self._running:
            packet = self._read_cobs_packet()
            logger.debug(f"RX: is_async={packet.is_async}, len={len(packet.data_read)}")
            # ... rest of logic

Monitor Queue Depth

def transact(self, request, timeout=1.0):
    queue_depth = self.async_queue.qsize()
    if queue_depth > 10:
        logger.warning(f"Async queue backlog: {queue_depth} packets")
    # ... rest of logic

Detect Missed Async Data

# If you're not polling fast enough, queue grows
if self.async_queue.qsize() > 100:
    logger.error("Async queue overflow - data being dropped!")
    # Drain old data
    while self.async_queue.qsize() > 10:
        self.async_queue.get_nowait()

Migration from Old API

Before (Synchronous Only)

bp = BusPirate()
response = bp.transact(request)

After (With Async Support)

bp = BusPirate()

# Same synchronous API still works
response = bp.transact(request)

# NEW: Check for async data
async_data = bp.poll_async()
if async_data:
    handle_uart_rx(async_data)

Backward compatible - existing code works unchanged, async is opt-in.


Complete Example: UART Echo Test

import time
from buspirate import BusPirate

bp = BusPirate('/dev/ttyACM0')

# Configure UART
bp.configure_mode('UART', baudrate=115200)

# Send test string
test_data = b"Hello, World!\r\n"
bp.uart_write(test_data)

# Collect echoed response
echo_buffer = bytearray()
timeout = time.time() + 2.0

while time.time() < timeout:
    packet = bp.poll_async(timeout=0.1)
    if packet:
        echo_buffer.extend(packet.data_read)
        if len(echo_buffer) >= len(test_data):
            break

if echo_buffer == test_data:
    print("✓ UART echo successful")
else:
    print(f"✗ Echo mismatch: sent {test_data}, got {echo_buffer}")

bp.close()

Testing Async Performance

import time

bp = BusPirate()
bp.configure_mode('UART', baudrate=115200)

# Measure async latency
received_times = []

def latency_test(packet):
    received_times.append(time.time())

bp.on_async(latency_test)

# Send 100 bytes, measure when they arrive
start = time.time()
bp.uart_write(bytes(range(100)))

time.sleep(0.5)  # Wait for all async packets

latencies = [t - start for t in received_times]
print(f"Packets received: {len(latencies)}")
print(f"Avg latency: {sum(latencies)/len(latencies)*1000:.2f}ms")
print(f"Max latency: {max(latencies)*1000:.2f}ms")

Reference: Full Minimal Example

#!/usr/bin/env python3
"""Minimal BPIO async client example"""

import serial
import threading
from queue import Queue, Empty

class SimpleBP:
    def __init__(self, port):
        self.ser = serial.Serial(port)
        self.async_q = Queue()
        self.resp_q = Queue()
        threading.Thread(target=self._read, daemon=True).start()
    
    def _read(self):
        buf = bytearray()
        while True:
            buf.extend(self.ser.read(1))
            if buf[-1] == 0:  # COBS delimiter
                pkt = decode(buf[:-1])
                (self.async_q if pkt.is_async else self.resp_q).put(pkt)
                buf.clear()
    
    def send(self, req):
        self.ser.write(encode(req))
        return self.resp_q.get(timeout=1)
    
    def poll(self):
        try: return self.async_q.get_nowait()
        except Empty: return None

# Usage
bp = SimpleBP('/dev/ttyACM0')
resp = bp.send(spi_request)
async_data = bp.poll()

This is the complete 20-line async client!

bpio uart mode added on bpio2 experimental branch.

table DataResponse {
  error:string; // Error message if any.
  data_read:[ubyte]; // Data read from device
  is_async: bool = false;  // NEW 2.2: marks this as unsolicited async data
}

flatbuffer schema updated with is_async flag.

For async data:

  • If there is async data
  • Get data
  • Wait 200us for more data (up to 512 bytes)
  • If timeout send

For normal request/response with read request:

  • Attempt to read requested number of bytes with 1 second timeout
  • However this isn’t complete because there is no indicator of how many bytes we read before timeout. Needs more thought. I think flatbuffers itself can report the vector length. Will check when I write the python wrapper.

I worked on this a bit on and off. Had firmware support going pretty well. The issue is the Python support.

The Scenario:

  1. Host sends data request to Bus Pirate to write bytes to UART
  2. Host waits for data response confirmation (no data, we get it async)
  3. Async process in python/host waits for any async responses

At this point is might be clear what a terrible design this is :slight_smile: 2 & 3 are in a race condition to see who gets the data response confirmation packet (non async). The async process doesn’t know what to do with that packet, and the data request/response gets messed up if the async process consumes it first.

We probably need a higher level thread or async process to act as a “dispatcher” or router. This process receives all response packets and directs them into the correct queue for either sync or async operations.

So now we have threads and queues… To reinvent a USB to UART bridge with a couple extra features… Only with much more opportunity for disappointment.

My gut tells me this isn’t the right approach. At least for my skills as a desktop/python developer.

My previously working but bug-inducing UART bpio mode stopped working while I was trying to debug it. I can’t tell what changed. It may be flatbuffer tooling not all being exactly the same version, but I recompiled and the issue persists. I’m going to spend a bit more time trying to get it running again, then maybe a Python wizard will be able to suggest how to support it.

Against my better judgement I worked up a potential solution:

  • There is now a master router thread that dispatches flatbuffer packets to the right placed based on async=true.
  • There are two queues that the lower processes consume.

The UART library supports two ways of dealing with the async data.

Buffered Mode - Simple polling:

uart.configure(speed=115200, ...)
# Later...
data = uart.read_async()  # Returns all accumulated data

This is kind of a beginner mode. The bpio_uart.py is quietly receiving all the async packets and putting them into a buffer. You can call read_async to get the accumulated data and iterate over it yourself.

Callback Mode - Real-time event handling:

def my_handler(data):
    print(f"Received: {data}")

uart.configure(speed=115200, async_callback=my_handler)
# Handler called automatically when data arrives

Alternately, you can define a callback function for handling async yourself and pass it during uart.configure.

Full UART demo
import argparse
import sys
import time

# Import BPIO client and UART interface
from pybpio.bpio_client import BPIOClient
from pybpio.bpio_uart import BPIOUART

def uart_buffered_mode(client, speed=115200):
    """UART example with buffered async data (read at leisure)."""
    print("=== UART Buffered Mode Example ===\n")
    
    uart = BPIOUART(client)

    # Configure UART without callback - data accumulates in buffer
    print("Configuring UART interface (buffered mode)...\n")
    if uart.configure(speed=speed, data_bits=8, parity=False, stop_bits=1, 
                      psu_enable=True, psu_set_mv=3300, psu_set_ma=0):

        print(f"UART configured at {speed} baud\n")
        
        # Send test message
        print("Sending test message...")
        test_message = b"Hello UART!\r\n"
        response = uart.write(test_message)
        if response:
            print(f"TX: {test_message.hex()} ({test_message})")
        
        # Wait for async loopback data to accumulate
        print("Waiting for async loopback data (3 seconds)...\n")
        time.sleep(3)
        
        # Read buffered async data at our leisure
        async_data = uart.read_async()
        if async_data:
            print(f"Async RX (buffered): {async_data.hex()} ({async_data})")
        else:
            print("No async data received")
        
        print("\nBuffered mode test complete.")
    else:
        print("Failed to configure UART interface")
        return False
    
    return True

def uart_callback_mode(client, speed=115200):
    """UART example with callback for real-time async data processing."""
    print("=== UART Callback Mode Example ===\n")
    
    uart = BPIOUART(client)
    
    # Track received data
    received_chunks = []
    
    def async_data_handler(data):
        """Callback function called when async data arrives"""
        print(f"  Callback RX: {data.hex()} ({data})")
        received_chunks.append(data)
    
    # Configure UART with callback - data goes directly to handler
    print("Configuring UART interface (callback mode)...\n")
    if uart.configure(speed=speed, data_bits=8, parity=False, stop_bits=1,
                      psu_enable=True, psu_set_mv=3300, psu_set_ma=0,
                      async_callback=async_data_handler):
        
        print(f"UART configured at {speed} baud\n")
        
        # Send test message
        print("Sending test message...")
        test_message = b"Hello UART!\r\n"
        response = uart.write(test_message)
        if response:
            print(f"TX: {test_message.hex()} ({test_message})")
        
        # Wait for async loopback data (handler called automatically)
        print("\nWaiting for async loopback (callback will be called)...\n")
        time.sleep(3)
        
        # Show what was received via callback
        if received_chunks:
            total_data = b''.join(received_chunks)
            print(f"\nTotal received via callback: {total_data.hex()} ({total_data})")
            print(f"Received in {len(received_chunks)} chunks")
        else:
            print("\nNo async data received")
        
        print("\nCallback mode test complete.")
    else:
        print("Failed to configure UART interface")
        return False
    
    return True
        

# Example usage and testing
def main():
    """Example UART usage with async monitoring"""

    parser = argparse.ArgumentParser(
        description='BPIO2 UART Example - Demonstrates buffered and callback async modes',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    %(prog)s -p COM3                    # Run callback mode example (default)
    %(prog)s -p COM3 --mode buffered    # Run buffered mode example
    %(prog)s -p COM3 --speed 9600       # Use 9600 baud with callback mode

Async Modes:
    Callback Mode (default):
        - Provide a callback function to configure()
        - Callback is called immediately when async data arrives
        - Good for real-time processing and event-driven applications
        - Data is NOT buffered when using callback mode
        
    Buffered Mode:
        - Async data accumulates in an internal buffer
        - Read at your leisure with uart.read_async()
        - Good for polling-based applications
"""
    )
    parser.add_argument('-p', '--port', required=True,
                       help='Serial port (e.g., COM3, /dev/ttyUSB0)')
    parser.add_argument('--speed', type=int, default=115200,
                       help='UART speed in bps (default: 115200)')
    parser.add_argument('--mode', choices=['buffered', 'callback'], default='callback',
                       help='Which mode to demonstrate (default: callback)')
    
    args = parser.parse_args()
    
    try:
        client = BPIOClient(args.port)
        print(f"Connected to Bus Pirate on {args.port}\n")

        if args.mode == 'buffered':
            success = uart_buffered_mode(client, speed=args.speed)
        else:  # callback
            success = uart_callback_mode(client, speed=args.speed)
        
        return 0 if success else 1
    
    except Exception as e:
        print(f"Error: {e}")
        return 1

if __name__ == "__main__":
    sys.exit(main())

Thoughts? Criticism?

1 Like

BPIO control of the UART is now in the latest firmware, python library, and docs.

This update includes a major change to the way Response packets from the Bus Pirate are handled. There’s a new is_async flag in the response packet indicating if it is in response a request, or generated asynchronously.

To support this, library developers need to have a main routing function that receives all the incoming packets and dispatches them to the right handler based on the async flag. See the Python library for an example.