Testable Emulation¶
Styx allows you to assert runtime behaviors of emulation using existing tools like pytest.
Using the Styx Python bindings, we can create a processor, run a firmware and assert runtime behaviors before, during, and after emulation.
We will follow the example found in
styx/bindings/styx-py-api/examples/uart-integration-test/main.py
.
Test Design¶
We want to construct a test to run under pytest. Let’s create an outline of our test:
def test_uart():
proc = build_processor()
# nonblocking, uart client will stop processor when message is received
uart_recv = start_uart_client(proc)
# blocking, processor will stop when uart message received
proc.start()
# assert our received message is correct
assert uart_recv == bytearray(b"Hello world.\r\n")
Now let’s have to build out these components.
Building the Processor¶
To build our processor we first have to start by defining our test
binary. Since we are running through the pytest script which will be
located in venv/bin
, our path will be located relative to that.
# pytest is located in venv/bin so out target program is relative to that.
TARGET_PROGRAM = "../../data/test-binaries/arm/kinetis_21/bin/freertos_hello/freertos_hello_debug.bin"
def get_script_path() -> Path:
"""Get directory of this script."""
return Path(sys.argv[0]).resolve().parent
def target_program_path() -> Path:
"""Get absolute target firmware path"""
return get_script_path() / TARGET_PROGRAM
Next we design build_processor()
to easily construct our processor
in our tests. Our target is a Kinetis21 that runs ARM Cortex M4. The
options here are documented. See the Styx ProcessorBuilder
documentation for more information on the possible configuration.
def build_processor(backend: Backend):
"""
Builds Kinetis21 processor
LittleEndian
firmware: FreeRTOS "hello world" (RawLoader)
cpu: ArmCortexM4 variant on chosen backend
plugins:
- ProcessorTracingPlugin
"""
# builder pattern for configuring a new processor
builder = ProcessorBuilder()
# define the path to our firmware image
builder.target_program = str(target_program_path())
# select an open port for ipc
builder.ipc_port = 0
# firmware image is a raw memory dump, no mapping needed
builder.loader = RawLoader()
# default executor is okay
builder.executor = DefaultExecutor()
# set our chosen backend
builder.backend = backend
# plugin for capturing and printing logs (traces)
builder.add_plugin(ProcessorTracingPlugin())
# cpu info
builder.variant = ArmVariant.ArmCortexM4
builder.endian = ArchEndian.LittleEndian
return builder.build(Target.Kinetis21)
Now that we have a processor, we must build a UART client to send it data over UART.
UART Client¶
Below is the outline for start_uart_client()
. The idea is that we
create a UART client that communicates with the given processor and send
the client to a separate thread that monitors for incoming data and
appends to a buffer of all received UART data. The caller of
start_uart_client()
then receives a reference to the growing
bytearray of received UART data.
The caller can use this bytearrary to see incoming UART data or analyze it after emulation to check for correctness.
def start_uart_client(proc: Processor) -> bytearray:
"""
Connect UART client to processor and return bytearray with all received data.
The UART client connects to the UART server started by the processor. In this example, it
receives the "Hello World" message from the target after which it stops the processor.
A bytearray is created and sent to the UART thread as well as returned to the caller. The
bytearray will be updated with all received UART data.
"""
# create received bytes bytearray
# create uart client
# start uart monitor thread
# return received bytes bytearray
pass
Styx conveniently provides a UartClient
that allows us to send and
receive UART data to the target through the processor’s IPC mechanisms.
The UartClient
can be instantiated like so, where ipc_port
is
the port opened by the processor to facilitate ipc and
DEBUG_UART_PORT
is hardware specific, defined by the firmware.
from styx.peripherals import UartClient
client = UartClient(f"http://127.0.0.1:{ipc_port}", DEBUG_UART_PORT)
Assuming we have the UART monitoring logic in function uart_thread
,
the starting of the UART client looks like the following.
def start_uart_client(proc: Processor) -> bytearray:
"""
Connect UART client to processor and return bytearray with all received data.
The UART client connects to the UART server started by the processor. In this example, it
receives the "Hello World" message from the target after which it stops the processor.
A bytearray is created and sent to the UART thread as well as returned to the caller. The
bytearray will be updated with all received UART data.
"""
# create received bytes bytearray
recv_bytes = bytearray()
# create uart client
# defined by the firmware, uses uart port 5
DEBUG_UART_PORT = 5
# ipc port of processor to connect to
ipc_port = proc.resolved_ipc_port
client = UartClient(f"http://127.0.0.1:{ipc_port}", DEBUG_UART_PORT)
# start uart monitor thread
# daemon mode allows this thread to be killed if the main thread is killed
thread = threading.Thread(
target=uart_thread, args=(client, proc, recv_bytes), daemon=True
)
thread.start()
# return received bytes bytearray
return recv_bytes
Last thing to do is define the UART monitoring logic.
def uart_thread(client: UartClient, proc: Processor, total_recv_bytes: bytearray):
"""
Function for uart receive thread.
Repeatedly checks for received data using client.recv_nonblocking() and
stops the processor when the whole message has been received, indicated
by receiving a newline.
UART data received is added by mutating the total_recv_bytes.
"""
while True:
# check for new UART data
if uart_data:
# add to our list of received bytes
# newline indicates end of message
if uart_data == b"\n":
print("got newline, shutting down")
proc.shutdown()
break
else:
# wait in between checks for new UART data
time.sleep(0.01)
The UartClient’s recv_nonblocking(n)
method checks for n available
received bytes and returns None if they aren’t found.
Filling in the missing parts for uart_thread()
:
def uart_thread(client: UartClient, proc: Processor, total_recv_bytes: bytearray):
"""
Function for uart receive thread.
Repeatedly checks for received data using client.recv_nonblocking() and
stops the processor when the whole message has been received, indicated
by receiving a newline.
UART data received is added by mutating the total_recv_bytes.
"""
while True:
# check for new UART data
# bytes or None if no bytes are available
current_recv_bytes = client.recv_nonblocking(1)
if current_recv_bytes:
# we got a byte, add to our list of received bytes
total_recv_bytes.extend(current_recv_bytes)
# newline indicates end of message
if current_recv_bytes == b"\n":
print("got newline, shutting down")
print(f'received message: "{total_recv_bytes.decode().strip()}"')
proc.shutdown()
break
else:
# wait in between checks for new UART data
time.sleep(0.01)
Final Touches¶
Finally we can write our tests:
def test_uart():
proc = build_processor(backend)
uart_recv = start_uart_client(proc)
# give time for processor and uart to connect, otherwise a race occurs
# between sending uart data and receiving.
time.sleep(0.1)
start_timeout(proc, 5)
proc.start()
I added a small timeout feature to stop the processor after 5 seconds in case anything goes awry.
def start_timeout(proc: Processor, seconds: float):
"""Stop processor after seconds passed."""
def timeout(proc: Processor):
time.sleep(seconds)
proc.shutdown()
thread = threading.Thread(target=timeout, args=(proc,), daemon=True)
thread.start()
Additionally we can use pytest features to test more effectively. Here I parametrize on the cpu backends.
@pytest.mark.parametrize("backend", backends)
def test_uart(backend: Backend):
proc = build_processor(backend)
uart_recv = start_uart_client(proc)
# give time for processor and uart to connect, otherwise a race occurs
# between sending uart data and receiving.
time.sleep(0.1)
start_timeout(proc, 5)
proc.start()
assert uart_recv == bytearray(b"Hello world.\r\n")
Let’s also add another test for processor initialization.
@pytest.mark.parametrize("backend", backends)
def test_build_proc(backend: Backend):
proc = build_processor(backend)
ipc_port = proc.resolved_ipc_port
assert ipc_port != 0
assert proc.processor_state == ProcessorState.Initialized
Perfect! Now we can run using pytest.
$ pytest /path/to/your/main.py
=== test session starts ===
platform linux -- Python 3.13.1, pytest-8.3.4, pluggy-1.5.0
rootdir: /path/to/your/
configfile: pyproject.toml
collected 4 items
/path/to/your/main.py .... [100%]
=== 4 passed in 0.76s ===