wip
Some checks failed
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m23s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Failing after 8s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 4s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s

This commit is contained in:
2025-08-18 18:54:35 +02:00
parent 6331b27b42
commit c635844aa9
17 changed files with 434 additions and 23 deletions

3
.gitignore vendored
View File

@@ -3,6 +3,9 @@ hardware/tonberry-pico/tonberry-pico-backups/
*.kicad_sch-bak
*.kicad_sch.lck
software/build
software/typings
compile_commands.json
.dir-locals.el
.cache
\#*#
__pycache__

11
DEVELOP.md Normal file
View File

@@ -0,0 +1,11 @@
# Developer notes
## How to setup python environment for mypy and pytest
```bash
cd software
python -m venv test-venv
. test-venv/bin/activate
pip install -r tests/requirements.txt
pip install -U micropython-rp2-pico_w-stubs --target typings
```

9
software/mypy.ini Normal file
View File

@@ -0,0 +1,9 @@
[mypy]
platform = linux
mypy_path = $MYPY_CONFIG_FILE_DIR:tests/mocks:$MYPY_CONFIG_FILE_DIR/typings
custom_typeshed_dir = $MYPY_CONFIG_FILE_DIR/typings
follow_imports = silent
exclude = "typings[\\/].*"
follow_imports_for_stubs = true
no_site_packages = true
check_untyped_defs = true

3
software/pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
pythonpath = tests/mocks src
testpaths = tests

View File

@@ -2,12 +2,11 @@
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from collections import namedtuple
import os
import time
from utils import TimerManager
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb'))
# Should be ~ 6dB steps
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
@@ -20,6 +19,7 @@ class PlayerApp:
self.timer_manager = TimerManager()
self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self)
self.playlist_db = deps.playlistdb(self)
self.buttons = deps.buttons(self) if deps.buttons is not None else None
self.mp3file = None
self.volume_pos = 3
@@ -39,17 +39,8 @@ class PlayerApp:
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
try:
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
if name.endswith(b'mp3')]
except OSError as ex:
print(f'Could not get playlist for tag {uid_str}: {ex}')
self.current_tag = None
self.player.stop()
return
testfiles.sort()
self._set_playlist(testfiles)
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
self._set_playlist(uid_str)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
@@ -60,6 +51,7 @@ class PlayerApp:
self.player.stop()
def onButtonPressed(self, what):
assert self.buttons is not None
if what == self.buttons.VOLUP:
self.volume_pos = min(self.volume_pos + 1, len(VOLUME_CURVE) - 1)
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
@@ -70,19 +62,19 @@ class PlayerApp:
self._play_next()
def onPlaybackDone(self):
assert self.mp3file is not None
self.mp3file.close()
self.mp3file = None
self._play_next()
def _set_playlist(self, files: list[bytes]):
self.playlist_pos = 0
self.playlist = files
self._play(self.playlist[self.playlist_pos])
def _set_playlist(self, tag: bytes):
self.playlist = self.playlist_db.getPlaylistForTag(tag)
self._play(self.playlist.getCurrentPath())
def _play_next(self):
if self.playlist_pos + 1 < len(self.playlist):
self.playlist_pos += 1
self._play(self.playlist[self.playlist_pos])
filename = self.playlist.getNextPath()
if filename is not None:
self._play(filename)
def _play(self, filename: bytes):
if self.mp3file is not None:

View File

@@ -78,7 +78,14 @@ def run():
asyncio.get_event_loop().run_forever()
def mount():
ctx = SDContext(mosi=Pin(3), miso=Pin(4), sck=Pin(2), ss=Pin(5), baudrate=15000000)
ctx.__enter__()
if __name__ == '__main__':
if machine.Pin(17, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
time.sleep(5)
time.sleep(1)
run()
else:
mount()

View File

@@ -3,7 +3,8 @@
from utils.buttons import Buttons
from utils.mbrpartition import MBRPartition
from utils.playlistdb import BTreeDB
from utils.sdcontext import SDContext
from utils.timer import TimerManager
__all__ = ["Buttons", "MBRPartition", "SDContext", "TimerManager"]
__all__ = ["BTreeDB", "Buttons", "MBRPartition", "SDContext", "TimerManager"]

View File

@@ -17,7 +17,7 @@ if TYPE_CHECKING:
class Buttons:
def __init__(self, cb: ButtonCallback, pin_volup=17, pin_voldown=19, pin_next=18):
def __init__(self, cb: "ButtonCallback", pin_volup=17, pin_voldown=19, pin_next=18):
self.VOLUP = micropython.const(1)
self.VOLDOWN = micropython.const(2)
self.NEXT = micropython.const(3)

View File

@@ -0,0 +1,125 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import btree
try:
import typing
from typing import TYPE_CHECKING # type: ignore
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
import typing
class IPlaylist(typing.Protocol):
def getPaths(self) -> list[bytes]: ...
def getCurrentPath(self) -> bytes: ...
def getNextPath(self) -> bytes: ...
class IPlaylistDB(typing.Protocol):
def getPlaylistForTag(self, tag: bytes) -> IPlaylist: ...
else:
class IPlaylistDB(object):
...
class BTreeDB(IPlaylistDB):
class Playlist:
def __init__(self, parent, tag, entries, pos):
self.parent = parent
self.tag = tag
self.entries = entries
self.pos = pos
def getPaths(self):
"""
Get entire playlist in storage order
"""
return self.entries
def getCurrentPath(self):
"""
Get path of file that should be played.
"""
return self.entries[self.pos]
def getNextPath(self):
"""
Select next track and return path.
"""
self.pos += 1
if self.pos >= len(self.entries):
self.pos = 0
self.parent._setPlaylistPos(self.tag, self.pos)
if self.pos == 0:
return None
return self.getCurrentPath()
def __init__(self, db: btree.BTree):
self.db = db
@staticmethod
def _keyPlaylistPos(tag):
return b''.join([tag, b'/playlistpos'])
@staticmethod
def _keyPlaylistEntry(tag, pos):
return b''.join([tag, b'/playlist/', str(pos).encode()])
def _setPlaylistPos(self, tag, pos, flush=True):
self.db[self._keyPlaylistPos(tag)] = str(pos).encode()
if flush:
self.db.flush()
def _savePlaylist(self, tag, entries, pos, flush=True):
self._deletePlaylist(tag, False)
for idx, entry in enumerate(entries):
self.db[self._keyPlaylistEntry(tag, idx)] = entry
self._setPlaylistPos(tag, pos, False)
if flush:
self.db.flush()
def _deletePlaylist(self, tag, flush=True):
start_key = b''.join([tag, b'/playlist/'])
end_key = b''.join([tag, b'/playlist0'])
keys = [k for k in self.db.keys(start_key, end_key)]
for k in keys:
del self.db[k]
del self.db[self._keyPlaylistPos(tag)]
if flush:
self.db.flush()
def getPlaylistForTag(self, tag: bytes):
start_key = b''.join([tag, b'/playlist/'])
end_key = b''.join([tag, b'/playlist0'])
entries = [val for val in self.db.values(start_key, end_key)]
if len(entries) == 0:
return None
pos = int(self.db.get(self._keyPlaylistPos(tag), '0'))
if pos < 0 or pos >= len(entries):
pos = 0
return self.Playlist(self, tag, entries, pos)
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes]):
self._savePlaylist(tag, entries, 0)
return self.getPlaylistForTag(tag)
class BTreeFileManager:
def __init__(self, db_path: str):
self.db_path = db_path
def __enter__(self):
try:
self.db_file = open(self.db_path, 'r+b')
except OSError:
self.db_file = open(self.db_path, 'w+b')
try:
self.db = btree.open(self.db_file)
return BTreeDB(self.db)
except Exception:
self.db_file.close()
raise
def __exit__(self):
self.db.close()
self.db_file.close()

View File

@@ -0,0 +1,21 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from typing import Iterable
class BTree:
def close(self): ...
def values(self, start_key: str | bytes, end_key: str | bytes | None = None, flags=None) -> Iterable[str | bytes]:
pass
def __setitem__(self, key: str | bytes, val: str | bytes): ...
def flush(self): ...
def get(self, key: str | bytes, default: str | bytes | None = None) -> str | bytes: ...
def open(dbfile) -> BTree:
pass

View File

@@ -0,0 +1,3 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>

View File

@@ -0,0 +1,3 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>

View File

@@ -0,0 +1,5 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
class SDCard():
pass

View File

@@ -0,0 +1 @@
pytest

View File

@@ -0,0 +1,126 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import app
import builtins
import pytest
import time
import utils
@pytest.fixture
def micropythonify():
def time_ticks_ms():
return time.time_ns() // 1000000
time.ticks_ms = time_ticks_ms
yield
del time.ticks_ms
class FakeMp3Player:
def __init__(self):
self.volume = None
self.track = None
def set_volume(self, vol):
self.volume = vol
def play(self, track):
self.track = track
class FakeTimerManager:
def __init__(self): pass
def cancel(self, timer): pass
class FakeNfcReader:
def __init__(self): pass
class FakeButtons:
def __init__(self): pass
class FakePlaylistDb:
class FakePlaylist:
def __init__(self, parent, pos=0):
self.parent = parent
self.pos = 0
def getCurrentPath(self):
return self.parent.tracklist[self.pos]
def getNextPath(self):
self.pos += 1
if self.pos >= len(self.parent.tracklist):
return None
return self.parent.tracklist[self.pos]
def __init__(self, tracklist=[b'test/path.mp3']):
self.tracklist = tracklist
def getPlaylistForTag(self, tag: bytes):
return self.FakePlaylist(self)
class FakeFile:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.closed = False
def close(self):
self.closed = True
def fake_open(filename, mode):
return FakeFile(filename, mode)
@pytest.fixture
def faketimermanager(monkeypatch):
monkeypatch.setattr(utils.timer.TimerManager, '_instance', FakeTimerManager())
def test_construct_app(micropythonify, faketimermanager):
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: FakePlaylistDb())
_ = app.PlayerApp(deps)
assert fake_mp3.volume is not None
def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track.filename == b'test/path.mp3'
assert "r" in fake_mp3.track.mode
assert "b" in fake_mp3.track.mode
def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track.filename == b'track1.mp3'
dut.onPlaybackDone()
assert fake_mp3.track.filename == b'track2.mp3'

View File

View File

@@ -0,0 +1,101 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from utils import BTreeDB
class FakeDB:
def __init__(self, contents):
self.contents = contents
self.saved_contents = dict(contents)
def flush(self):
self.saved_contents = dict(self.contents)
def values(self, start_key, end_key=None, flags=None):
res = []
for key in sorted(self.contents):
if start_key >= key:
continue
if end_key is not None and end_key < key:
break
res.append(self.contents[key])
return res
def keys(self, start_key, end_key=None, flags=None):
res = []
for key in sorted(self.contents):
if start_key >= key:
continue
if end_key is not None and end_key < key:
break
res.append(key)
return res
def get(self, key, default=None):
return self.contents.get(key, default)
def __getitem__(self, key):
return self.contents[key]
def __setitem__(self, key, val):
self.contents[key] = val
def __delitem__(self, key):
del self.contents[key]
def test_playlist_load():
contents = {b'foo/part': b'no',
b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlisttt': b'no'
}
uut = BTreeDB(FakeDB(contents))
pl = uut.getPlaylistForTag(b'foo')
assert pl.getPaths() == [b'track1', b'track2']
assert pl.getCurrentPath() == b'track1'
def test_playlist_nextpath():
contents = FakeDB({b'foo/part': b'no',
b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlisttt': b'no'
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
assert pl.getNextPath() == b'track2'
assert contents.saved_contents[b'foo/playlistpos'] == b'1'
def test_playlist_nextpath_last():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpos': b'1'
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
assert pl.getNextPath() is None
assert contents.saved_contents[b'foo/playlistpos'] == b'0'
def test_playlist_create():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpos': b'1'
})
newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3']
uut = BTreeDB(contents)
new_pl = uut.createPlaylistForTag(b'foo', newplaylist)
assert new_pl.getPaths() == newplaylist
assert new_pl.getCurrentPath() == newplaylist[0]
def test_playlist_load_notexist():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpos': b'1'
})
uut = BTreeDB(contents)
assert uut.getPlaylistForTag(b'notfound') is None