tests/ports/rp2: Convert rp2.DMA test to a unittest.

This test is rather complicated and benefits from being a unittest.

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George
2025-04-28 13:19:39 +10:00
parent 3f1df4bacb
commit ffd7e0e28d
2 changed files with 132 additions and 124 deletions

View File

@@ -4,103 +4,142 @@ import sys
import time
import machine
import rp2
import unittest
is_rp2350 = "RP2350" in sys.implementation._machine
src = bytes(i & 0xFF for i in range(16 * 1024))
print("# test basic usage")
dma = rp2.DMA()
# Test printing.
print(dma)
# Test pack_ctrl/unpack_ctrl.
ctrl_dict = rp2.DMA.unpack_ctrl(dma.pack_ctrl())
if is_rp2350:
for entry in ("inc_read_rev", "inc_write_rev"):
assert entry in ctrl_dict
del ctrl_dict[entry]
for key, value in sorted(ctrl_dict.items()):
print(key, value)
# Test register access.
dma.read = 0
dma.write = 0
dma.count = 0
dma.ctrl = dma.pack_ctrl()
print(dma.read, dma.write, dma.count, dma.ctrl & 0x01F, dma.channel, dma.registers)
dma.close()
# Test closing when already closed.
dma.close()
# Test using when closed.
try:
dma.active()
assert False
except ValueError:
print("ValueError")
# Test simple memory copy.
print("# test memory copy")
dest = bytearray(1024)
dma = rp2.DMA()
dma.config(read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=False)
print(not any(dest))
dma.active(True)
while dma.active():
pass
print(dest[:8], dest[-8:])
dma.close()
SRC = bytes(i & 0xFF for i in range(16 * 1024))
# Test time taken for a large memory copy.
def run_and_time_dma(dma):
ticks_us = time.ticks_us
irq_state = machine.disable_irq()
t0 = ticks_us()
dma.active(True)
while dma.active():
pass
t1 = ticks_us()
machine.enable_irq(irq_state)
return time.ticks_diff(t1, t0)
class Test(unittest.TestCase):
def setUp(self):
self.dma = rp2.DMA()
def tearDown(self):
self.dma.close()
def test_printing(self):
dma = self.dma
self.assertEqual(str(dma), "DMA(0)")
def test_pack_unpack_ctrl(self):
dma = self.dma
ctrl_dict = rp2.DMA.unpack_ctrl(dma.pack_ctrl())
if is_rp2350:
self.assertEqual(len(ctrl_dict), 18)
self.assertTrue("inc_read_rev" in ctrl_dict)
self.assertTrue("inc_write_rev" in ctrl_dict)
else:
self.assertEqual(len(ctrl_dict), 16)
self.assertEqual(ctrl_dict["ahb_err"], 0)
self.assertEqual(ctrl_dict["bswap"], 0)
self.assertEqual(ctrl_dict["busy"], 0)
self.assertEqual(ctrl_dict["chain_to"], 0)
self.assertEqual(ctrl_dict["enable"], 1)
self.assertEqual(ctrl_dict["high_pri"], 0)
self.assertEqual(ctrl_dict["inc_read"], 1)
self.assertEqual(ctrl_dict["inc_write"], 1)
self.assertEqual(ctrl_dict["irq_quiet"], 1)
self.assertEqual(ctrl_dict["read_err"], 0)
self.assertEqual(ctrl_dict["ring_sel"], 0)
self.assertEqual(ctrl_dict["ring_size"], 0)
self.assertEqual(ctrl_dict["size"], 2)
self.assertEqual(ctrl_dict["sniff_en"], 0)
self.assertEqual(ctrl_dict["treq_sel"], 63)
self.assertEqual(ctrl_dict["write_err"], 0)
def test_register_access(self):
dma = self.dma
dma.read = 0
dma.write = 0
dma.count = 0
dma.ctrl = dma.pack_ctrl()
self.assertEqual(dma.read, 0)
self.assertEqual(dma.write, 0)
self.assertEqual(dma.count, 0)
self.assertEqual(dma.ctrl & 0x01F, 25)
self.assertEqual(dma.channel, 0)
self.assertIsInstance(dma.registers, memoryview)
def test_close(self):
dma = self.dma
dma.close()
# Test closing when already closed.
dma.close()
# Test using when closed.
with self.assertRaises(ValueError):
dma.active()
def test_simple_memory_copy(self):
dma = self.dma
dest = bytearray(1024)
dma.config(read=SRC, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=False)
self.assertFalse(any(dest))
dma.active(True)
while dma.active():
pass
self.assertEqual(dest[:8], SRC[:8])
self.assertEqual(dest[-8:], SRC[-8:])
def test_time_taken_for_large_memory_copy(self):
def run_and_time_dma(dma):
ticks_us = time.ticks_us
irq_state = machine.disable_irq()
t0 = ticks_us()
dma.active(True)
while dma.active():
pass
t1 = ticks_us()
machine.enable_irq(irq_state)
return time.ticks_diff(t1, t0)
dma = self.dma
dest = bytearray(16 * 1024)
dma.read = SRC
dma.write = dest
dma.count = len(dest) // 4
dma.ctrl = dma.pack_ctrl()
dt = run_and_time_dma(dma)
expected_dt_range = range(40, 70) if is_rp2350 else range(70, 125)
self.assertIn(dt, expected_dt_range)
self.assertEqual(dest[:8], SRC[:8])
self.assertEqual(dest[-8:], SRC[-8:])
def test_config_trigger(self):
# Test using .config(trigger=True) to start DMA immediately.
dma = self.dma
dest = bytearray(1024)
dma.config(read=SRC, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=True)
while dma.active():
pass
self.assertEqual(dest[:8], SRC[:8])
self.assertEqual(dest[-8:], SRC[-8:])
def test_irq(self):
def callback(dma):
nonlocal irq_flags
print("irq fired")
irq_flags = dma.irq().flags()
dma = self.dma
irq_flags = None
dest = bytearray(1024)
dma.irq(callback)
dma.config(
read=SRC,
write=dest,
count=len(dest) // 4,
ctrl=dma.pack_ctrl(irq_quiet=0),
trigger=True,
)
while dma.active():
pass
self.assertEqual(irq_flags, 1)
self.assertEqual(dest[:8], SRC[:8])
self.assertEqual(dest[-8:], SRC[-8:])
print("# test timing")
dest = bytearray(16 * 1024)
dma = rp2.DMA()
dma.read = src
dma.write = dest
dma.count = len(dest) // 4
dma.ctrl = dma.pack_ctrl()
dt = run_and_time_dma(dma)
expected_dt_range = range(40, 70) if is_rp2350 else range(70, 125)
print(dt in expected_dt_range)
print(dest[:8], dest[-8:])
dma.close()
# Test using .config(trigger=True).
print("# test immediate trigger")
dest = bytearray(1024)
dma = rp2.DMA()
dma.config(read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(), trigger=True)
while dma.active():
pass
print(dest[:8], dest[-8:])
dma.close()
# Test the DMA.irq() method.
print("# test irq")
dest = bytearray(1024)
dma = rp2.DMA()
dma.irq(lambda dma: print("irq fired", dma.irq().flags()))
dma.config(
read=src, write=dest, count=len(dest) // 4, ctrl=dma.pack_ctrl(irq_quiet=0), trigger=True
)
while dma.active():
pass
print(dest[:8], dest[-8:])
dma.close()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,31 +0,0 @@
# test basic usage
DMA(0)
ahb_err 0
bswap 0
busy 0
chain_to 0
enable 1
high_pri 0
inc_read 1
inc_write 1
irq_quiet 1
read_err 0
ring_sel 0
ring_size 0
size 2
sniff_en 0
treq_sel 63
write_err 0
0 0 0 25 0 <memoryview>
ValueError
# test memory copy
True
bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07') bytearray(b'\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
# test timing
True
bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07') bytearray(b'\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
# test immediate trigger
bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07') bytearray(b'\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
# test irq
irq fired 1
bytearray(b'\x00\x01\x02\x03\x04\x05\x06\x07') bytearray(b'\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')