tests/multi_pyb_can: Add multitests for pyboard CAN controller.
Currently only classic CAN, but tests run on both the stm32 classic CAN controller and the FD-CAN controller with the same results. Signed-off-by: Angus Gratton <angus@redyak.com.au>
This commit is contained in:
committed by
Damien George
parent
9db2398009
commit
dbda43b9e1
98
tests/multi_pyb_can/rx_callback.py
Normal file
98
tests/multi_pyb_can/rx_callback.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
from pyb import CAN
|
||||||
|
import time
|
||||||
|
import errno
|
||||||
|
|
||||||
|
# Test the various receive IRQs, including overflow
|
||||||
|
|
||||||
|
rx_overflow = False
|
||||||
|
|
||||||
|
REASONS = ["received", "full", "overflow"]
|
||||||
|
|
||||||
|
# CAN IDs
|
||||||
|
ID_SPAM = 0x345 # messages spammed into the receive FIFO
|
||||||
|
ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow
|
||||||
|
ID_AFTER = 0x100 # message the sender sends after the ACK
|
||||||
|
|
||||||
|
|
||||||
|
def cb0(bus, reason):
|
||||||
|
global rx_overflow
|
||||||
|
if reason != 0 and not rx_overflow:
|
||||||
|
# exact timing of 'received' callbacks depends on controller type,
|
||||||
|
# so only log the other two
|
||||||
|
print("rx0 reason", REASONS[reason])
|
||||||
|
if reason == 2:
|
||||||
|
rx_overflow = True
|
||||||
|
|
||||||
|
|
||||||
|
# Accept all standard IDs on FIFO 0
|
||||||
|
def _enable_accept_all():
|
||||||
|
if hasattr(CAN, "MASK"): # FD-CAN controller
|
||||||
|
can.setfilter(0, CAN.RANGE, 0, (0x0, 0x7FF), extframe=False)
|
||||||
|
else:
|
||||||
|
can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0), extframe=False)
|
||||||
|
|
||||||
|
|
||||||
|
# Receiver
|
||||||
|
def instance0():
|
||||||
|
_enable_accept_all()
|
||||||
|
can.rxcallback(0, cb0)
|
||||||
|
|
||||||
|
multitest.next()
|
||||||
|
multitest.wait("sender ready")
|
||||||
|
multitest.broadcast("receiver ready")
|
||||||
|
|
||||||
|
while not rx_overflow:
|
||||||
|
pass # Resume ASAP after FIFO0 overflows
|
||||||
|
|
||||||
|
can.send(b"overflow", ID_ACK_OFLOW)
|
||||||
|
|
||||||
|
# drain the receive FIFO, making sure we read at least on ID_SPAM message
|
||||||
|
rxed_spam = False
|
||||||
|
while can.any(0):
|
||||||
|
msg = can.recv(0, timeout=0)
|
||||||
|
assert msg[0] == ID_SPAM
|
||||||
|
rxed_spam = True
|
||||||
|
print("rxed_spam", rxed_spam)
|
||||||
|
|
||||||
|
# This should be the one message with ID_AFTER, there may be one or two spam messages as well
|
||||||
|
for _ in range(10):
|
||||||
|
msg = can.recv(0, timeout=500)
|
||||||
|
if msg[0] == ID_AFTER:
|
||||||
|
print(msg)
|
||||||
|
break
|
||||||
|
|
||||||
|
# RX FIFO should be empty now
|
||||||
|
print("any", can.any(0))
|
||||||
|
|
||||||
|
|
||||||
|
# Sender
|
||||||
|
def instance1():
|
||||||
|
_enable_accept_all()
|
||||||
|
multitest.next()
|
||||||
|
multitest.broadcast("sender ready")
|
||||||
|
multitest.wait("receiver ready")
|
||||||
|
|
||||||
|
# Spam out messages until the receiver tells us its RX FIFO is full.
|
||||||
|
#
|
||||||
|
# The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7),
|
||||||
|
# so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating
|
||||||
|
# the receiver's FIFO has overflowed
|
||||||
|
for i in range(255):
|
||||||
|
can.send(bytes([i] * 8), ID_SPAM, timeout=25)
|
||||||
|
if can.any(0):
|
||||||
|
print(can.recv(0)) # should be ID_ACK_OFLOW
|
||||||
|
break
|
||||||
|
# on boards like STM32H7 the TX FIFO is really deep, so don't fill it too quickly...
|
||||||
|
time.sleep_ms(1)
|
||||||
|
|
||||||
|
# give the receiver some time to make space in the FIFO
|
||||||
|
time.sleep_ms(200)
|
||||||
|
|
||||||
|
# send the final message, the receiver should get this one
|
||||||
|
can.send(b"aaaaa", ID_AFTER)
|
||||||
|
|
||||||
|
# Sender's RX FIFO should also be empty at this point
|
||||||
|
print("any", can.any(0))
|
||||||
|
|
||||||
|
|
||||||
|
can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75)
|
||||||
9
tests/multi_pyb_can/rx_callback.py.exp
Normal file
9
tests/multi_pyb_can/rx_callback.py.exp
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
--- instance0 ---
|
||||||
|
rx0 reason full
|
||||||
|
rx0 reason overflow
|
||||||
|
rxed_spam True
|
||||||
|
(256, False, False, 0, b'aaaaa')
|
||||||
|
any False
|
||||||
|
--- instance1 ---
|
||||||
|
(85, False, False, 0, b'overflow')
|
||||||
|
any False
|
||||||
50
tests/multi_pyb_can/rx_filters.py
Normal file
50
tests/multi_pyb_can/rx_filters.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
from pyb import CAN
|
||||||
|
import time
|
||||||
|
import errno
|
||||||
|
|
||||||
|
# Test for the filtering capabilities for RX FIFO 0 and 1.
|
||||||
|
|
||||||
|
|
||||||
|
# Receiver
|
||||||
|
def instance0():
|
||||||
|
# Configure to receive standard frames (in a range) on FIFO 0
|
||||||
|
# and extended frames (in a range) on FIFO 1.
|
||||||
|
if hasattr(CAN, "MASK"):
|
||||||
|
# FD-CAN has independent filter banks for standard and extended IDs
|
||||||
|
can.setfilter(0, CAN.MASK, 0, (0x300, 0x700), extframe=False)
|
||||||
|
can.setfilter(0, CAN.MASK, 1, (0x3000, 0x7000), extframe=True)
|
||||||
|
else:
|
||||||
|
# pyb.CAN only supports MASK32 for extended ids
|
||||||
|
can.setfilter(0, CAN.MASK16, 0, (0x300, 0x700, 0x300, 0x700), extframe=False)
|
||||||
|
can.setfilter(1, CAN.MASK32, 1, (0x3000, 0x7000), extframe=True)
|
||||||
|
|
||||||
|
multitest.next()
|
||||||
|
multitest.wait("sender ready")
|
||||||
|
multitest.broadcast("receiver ready")
|
||||||
|
for i in range(3):
|
||||||
|
print(i)
|
||||||
|
print("fifo0", can.recv(0, timeout=200))
|
||||||
|
print("fifo1", can.recv(1, timeout=200))
|
||||||
|
|
||||||
|
try:
|
||||||
|
can.recv(0, timeout=100) # should time out
|
||||||
|
except OSError as e:
|
||||||
|
assert e.errno == errno.ETIMEDOUT
|
||||||
|
print("Timed out as expected")
|
||||||
|
|
||||||
|
|
||||||
|
# Sender
|
||||||
|
def instance1():
|
||||||
|
multitest.next()
|
||||||
|
multitest.broadcast("sender ready")
|
||||||
|
multitest.wait("receiver ready")
|
||||||
|
|
||||||
|
for i in range(3):
|
||||||
|
print(i)
|
||||||
|
can.send(bytes([i, 3] * i), 0x345)
|
||||||
|
can.send(bytes([0xEE] * i), 0x3700 + i, extframe=True)
|
||||||
|
can.send(b"abcdef", 0x123) # matches no filter, expect ACKed but not received
|
||||||
|
time.sleep_ms(5) # avoid flooding either our or the receiver's FIFO
|
||||||
|
|
||||||
|
|
||||||
|
can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75)
|
||||||
15
tests/multi_pyb_can/rx_filters.py.exp
Normal file
15
tests/multi_pyb_can/rx_filters.py.exp
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
--- instance0 ---
|
||||||
|
0
|
||||||
|
fifo0 (837, False, False, 0, b'')
|
||||||
|
fifo1 (14080, True, False, 0, b'')
|
||||||
|
1
|
||||||
|
fifo0 (837, False, False, 0, b'\x01\x03')
|
||||||
|
fifo1 (14081, True, False, 0, b'\xee')
|
||||||
|
2
|
||||||
|
fifo0 (837, False, False, 0, b'\x02\x03\x02\x03')
|
||||||
|
fifo1 (14082, True, False, 0, b'\xee\xee')
|
||||||
|
Timed out as expected
|
||||||
|
--- instance1 ---
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
Reference in New Issue
Block a user