Moving forward. Cleaned up some partially finished implementations of 3wire, 2wire, LED. Did a huge reorg of the transactions. Refactored the LED mode into function pointer arrays.
Starting on the update of the data response packet to provide async data access. I asked the LLM to describe some ways to handle the async packets in python. Python isn’t my thing, and I don’t want to build something that ends up being unusable on the python side.
Some ways to deal with async in python
BPIO Async Data - Python Client Implementation Patterns
Overview
BPIO supports asynchronous data from modes like UART and 2WIRE. Async packets have is_async=true flag in DataResponse to distinguish them from normal request/response transactions.
Recommended: Threading with Queue Pattern
Implementation
import serial
import threading
from queue import Queue, Empty
import time
class BusPirate:
def __init__(self, port='/dev/ttyACM0', baudrate=115200):
self.serial = serial.Serial(port, baudrate=baudrate, timeout=None)
self.async_queue = Queue()
self.response_queue = Queue()
self._running = True
# Start background reader thread
self.reader_thread = threading.Thread(
target=self._reader_loop,
daemon=True,
name='BusPirate-Reader'
)
self.reader_thread.start()
def _reader_loop(self):
"""Background thread continuously reads packets"""
while self._running:
try:
packet = self._read_cobs_packet() # Blocks on serial.read()
if packet.is_async:
self.async_queue.put(packet)
else:
self.response_queue.put(packet)
except Exception as e:
print(f"Reader error: {e}")
break
def _read_cobs_packet(self):
"""Read COBS-encoded packet from serial (blocks until 0x00 delimiter)"""
buffer = bytearray()
while True:
byte = self.serial.read(1) # Efficient blocking read
if byte == b'\x00':
# Decode COBS and parse FlatBuffer
decoded = cobs_decode(buffer)
return parse_flatbuffer(decoded)
buffer.extend(byte)
def transact(self, request, timeout=1.0):
"""Send request and wait for response"""
# Clear any stale responses
while not self.response_queue.empty():
self.response_queue.get_nowait()
# Send request
encoded = encode_request(request)
self.serial.write(encoded)
# Wait for response
try:
return self.response_queue.get(timeout=timeout)
except Empty:
raise TimeoutError(f"No response within {timeout}s")
def poll_async(self, timeout=0):
"""Check for async data (non-blocking by default)"""
try:
return self.async_queue.get(timeout=timeout)
except Empty:
return None
def on_async(self, callback):
"""Register callback for async data (advanced usage)"""
def wrapper():
while self._running:
packet = self.async_queue.get() # Blocks until data
callback(packet)
threading.Thread(target=wrapper, daemon=True).start()
def close(self):
"""Cleanup"""
self._running = False
self.serial.close()
self.reader_thread.join(timeout=1.0)
# Helper functions (to be implemented with actual COBS/FlatBuffer code)
def cobs_decode(data):
"""COBS decode implementation"""
pass
def encode_request(request):
"""Encode FlatBuffer request with COBS + 0x00 delimiter"""
pass
def parse_flatbuffer(data):
"""Parse FlatBuffer DataResponse/ConfigurationResponse/etc"""
pass
Usage Examples
Basic: Poll for Async Data
bp = BusPirate('/dev/ttyACM0')
# Configure UART mode
bp.configure_mode('UART', baudrate=115200)
# Normal SPI transaction
response = bp.spi_write([0x01, 0x02, 0x03])
print(f"SPI response: {response.data_read}")
# Poll for UART async data
while True:
async_data = bp.poll_async()
if async_data:
print(f"UART RX: {bytes(async_data.data_read)}")
time.sleep(0.01) # 10ms poll interval
Advanced: Callback-Driven
bp = BusPirate('/dev/ttyACM0')
# Register async handler
def uart_handler(packet):
data = bytes(packet.data_read)
print(f"UART RX [{len(data)} bytes]: {data.hex()}")
bp.on_async(uart_handler)
# Configure UART
bp.configure_mode('UART', baudrate=9600)
# Continue with normal operations - async data handled in background
while True:
response = bp.spi_write([0xAA, 0xBB])
time.sleep(1)
Mixed: UART Transmit + Async Receive
bp = BusPirate('/dev/ttyACM0')
bp.configure_mode('UART', baudrate=115200)
# Send UART data
bp.uart_write(b"AT\r\n")
# Wait for async response
timeout = time.time() + 5.0
response_data = bytearray()
while time.time() < timeout:
packet = bp.poll_async(timeout=0.1)
if packet:
response_data.extend(packet.data_read)
if b'\r\n' in response_data:
print(f"Response: {response_data.decode()}")
break
Alternative Patterns
Option 2: asyncio (Modern Python 3.7+)
import asyncio
import serial_asyncio
class BusPirateAsync:
def __init__(self):
self.transport = None
self.protocol = None
self.async_callbacks = []
self.pending_response = None
async def connect(self, port='/dev/ttyACM0'):
loop = asyncio.get_event_loop()
self.transport, self.protocol = await serial_asyncio.create_serial_connection(
loop, BusPirateProtocol, port, baudrate=115200
)
self.protocol.async_callbacks = self.async_callbacks
async def transact(self, request):
self.protocol.pending_response = asyncio.Future()
self.transport.write(encode_request(request))
return await asyncio.wait_for(self.protocol.pending_response, timeout=1.0)
def on_async(self, callback):
self.async_callbacks.append(callback)
class BusPirateProtocol(asyncio.Protocol):
def __init__(self):
self.buffer = bytearray()
self.async_callbacks = []
self.pending_response = None
def data_received(self, data):
self.buffer.extend(data)
while b'\x00' in self.buffer:
idx = self.buffer.index(b'\x00')
packet_bytes = bytes(self.buffer[:idx])
self.buffer = self.buffer[idx+1:]
packet = parse_cobs_flatbuffer(packet_bytes)
if packet.is_async:
for callback in self.async_callbacks:
callback(packet)
else:
if self.pending_response and not self.pending_response.done():
self.pending_response.set_result(packet)
# Usage
async def main():
bp = BusPirateAsync()
await bp.connect('/dev/ttyACM0')
bp.on_async(lambda p: print(f"Async: {bytes(p.data_read)}"))
response = await bp.transact(spi_request)
print(f"Response: {response.data_read}")
asyncio.run(main())
Option 3: Event-Driven with select()
import serial
import select
import threading
from collections import deque
class BusPirateEventDriven:
def __init__(self, port='/dev/ttyACM0'):
self.serial = serial.Serial(port, timeout=0) # Non-blocking
self.async_callbacks = []
self.response_queue = deque()
self._running = True
threading.Thread(target=self._event_loop, daemon=True).start()
def _event_loop(self):
"""Efficient event loop using select()"""
while self._running:
# Wait for data (blocks efficiently, no CPU waste)
ready, _, _ = select.select([self.serial.fileno()], [], [], 0.1)
if ready:
packet = self._read_packet()
if packet.is_async:
for callback in self.async_callbacks:
callback(packet)
else:
self.response_queue.append(packet)
def on_async(self, callback):
self.async_callbacks.append(callback)
Performance Notes
Why Threading Works Well
- COBS Framing: 0x00 delimiter means reader thread naturally blocks until complete packet
- OS-Level Efficiency:
serial.read() blocks in kernel - zero CPU usage while waiting
- Queue Overhead: Minimal - Python’s Queue is implemented in C
- GIL Impact: Irrelevant - reader thread blocks on I/O, not compute
Typical Latency
- Normal response: <1ms (request → response)
- Async delivery: <5ms (hardware RX → Python callback)
- Queue overhead: <100μs
Memory Usage
- Per packet: ~512 bytes (max BPIO packet size)
- Queue depth: Auto-grows, typical <10 packets buffered
- Total overhead: <50KB for typical usage
Debugging Tips
Enable Debug Logging
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('BusPirate')
class BusPirate:
def _reader_loop(self):
while self._running:
packet = self._read_cobs_packet()
logger.debug(f"RX: is_async={packet.is_async}, len={len(packet.data_read)}")
# ... rest of logic
Monitor Queue Depth
def transact(self, request, timeout=1.0):
queue_depth = self.async_queue.qsize()
if queue_depth > 10:
logger.warning(f"Async queue backlog: {queue_depth} packets")
# ... rest of logic
Detect Missed Async Data
# If you're not polling fast enough, queue grows
if self.async_queue.qsize() > 100:
logger.error("Async queue overflow - data being dropped!")
# Drain old data
while self.async_queue.qsize() > 10:
self.async_queue.get_nowait()
Migration from Old API
Before (Synchronous Only)
bp = BusPirate()
response = bp.transact(request)
After (With Async Support)
bp = BusPirate()
# Same synchronous API still works
response = bp.transact(request)
# NEW: Check for async data
async_data = bp.poll_async()
if async_data:
handle_uart_rx(async_data)
Backward compatible - existing code works unchanged, async is opt-in.
Complete Example: UART Echo Test
import time
from buspirate import BusPirate
bp = BusPirate('/dev/ttyACM0')
# Configure UART
bp.configure_mode('UART', baudrate=115200)
# Send test string
test_data = b"Hello, World!\r\n"
bp.uart_write(test_data)
# Collect echoed response
echo_buffer = bytearray()
timeout = time.time() + 2.0
while time.time() < timeout:
packet = bp.poll_async(timeout=0.1)
if packet:
echo_buffer.extend(packet.data_read)
if len(echo_buffer) >= len(test_data):
break
if echo_buffer == test_data:
print("✓ UART echo successful")
else:
print(f"✗ Echo mismatch: sent {test_data}, got {echo_buffer}")
bp.close()
Testing Async Performance
import time
bp = BusPirate()
bp.configure_mode('UART', baudrate=115200)
# Measure async latency
received_times = []
def latency_test(packet):
received_times.append(time.time())
bp.on_async(latency_test)
# Send 100 bytes, measure when they arrive
start = time.time()
bp.uart_write(bytes(range(100)))
time.sleep(0.5) # Wait for all async packets
latencies = [t - start for t in received_times]
print(f"Packets received: {len(latencies)}")
print(f"Avg latency: {sum(latencies)/len(latencies)*1000:.2f}ms")
print(f"Max latency: {max(latencies)*1000:.2f}ms")
Reference: Full Minimal Example
#!/usr/bin/env python3
"""Minimal BPIO async client example"""
import serial
import threading
from queue import Queue, Empty
class SimpleBP:
def __init__(self, port):
self.ser = serial.Serial(port)
self.async_q = Queue()
self.resp_q = Queue()
threading.Thread(target=self._read, daemon=True).start()
def _read(self):
buf = bytearray()
while True:
buf.extend(self.ser.read(1))
if buf[-1] == 0: # COBS delimiter
pkt = decode(buf[:-1])
(self.async_q if pkt.is_async else self.resp_q).put(pkt)
buf.clear()
def send(self, req):
self.ser.write(encode(req))
return self.resp_q.get(timeout=1)
def poll(self):
try: return self.async_q.get_nowait()
except Empty: return None
# Usage
bp = SimpleBP('/dev/ttyACM0')
resp = bp.send(spi_request)
async_data = bp.poll()
This is the complete 20-line async client!