4 Commits

Author SHA1 Message Date
64716ea2d8 Merge branch 'fix-no-db' into mbl-next
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m42s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 10s
2025-11-02 14:34:40 +01:00
f351fc94d3 fix(main): Make upgrade from tag directories more ergonomical
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m19s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 9s
Run pytests / Check-Pytest (push) Successful in 11s
As we don't have an API to upload files and create playlists yet, we
used the convention "tag uid as directory name" to allow testing
playback. With the introduction of the database in #39 "Add playlist db"
a builddb() function to initialize the database from the tag directories
was added, but it is not automatically called. To make the developer
experience more ergonomical, add a check for that automatically runs
builddb() when no database exists.

Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
2025-11-02 14:24:15 +01:00
7d65d077d5 feat(app): Implement tag handling modes according to #28.
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m22s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 9s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 12s
If the tagmode setting is 'tagremains' (the default): Play as long as the
tag is on the reader, with a 5 second timeout (i.e. playback stops when
tag is gone for more than 5 seconds).

If the tagmode setting is 'tagstartstop': Playback starts when a tag is
seen. Presenting a different tag causes the playback to stop and
playback for the new tag to start. Re-presenting the same tag used to
start with a no-tag-time of 5 seconds or more in between stops playback.

Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
2025-10-31 18:28:49 +01:00
6c37f5f3cc refactor(app): Refactor tag handling
Split the tag handling in PlayerApp into two parts: The low level
handling of changed/removed tags from the Nfc reader, including the
timeout processing is moved into the nested TagStateMachine class. The
main PlayerApp has two new (internal) callbacks "onNewTag" and
"onTagRemoved" which are called when a new tag is presented or a tag was
actually removed. This will hopefully make the implementation of #28
"Configurable Tag Handling" cleaner.

Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
2025-10-31 16:59:33 +01:00
4 changed files with 203 additions and 53 deletions

View File

@@ -13,13 +13,40 @@ VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
class PlayerApp:
class TagStateMachine:
def __init__(self, parent, timer_manager):
self.parent = parent
self.timer_manager = timer_manager
self.current_tag = None
self.current_tag_time = time.ticks_ms()
def onTagChange(self, new_tag):
if new_tag is not None:
self.timer_manager.cancel(self.onTagRemoveDelay)
if new_tag == self.current_tag:
return
# Change playlist on new tag
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
self.parent.onNewTag(new_tag)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
def onTagRemoveDelay(self):
if self.current_tag is not None:
self.current_tag = None
self.parent.onTagRemoved()
def __init__(self, deps: Dependencies):
self.current_tag = None
self.current_tag_time = time.ticks_ms()
self.timer_manager = TimerManager()
self.tag_state_machine = self.TagStateMachine(self, self.timer_manager)
self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self)
self.nfc = deps.nfcreader(self.tag_state_machine)
self.playlist_db = deps.playlistdb(self)
self.tag_mode = self.playlist_db.getSetting('tagmode')
self.playing_tag = None
self.playlist = None
self.buttons = deps.buttons(self) if deps.buttons is not None else None
self.mp3file = None
self.volume_pos = 3
@@ -30,28 +57,26 @@ class PlayerApp:
self.mp3file.close()
self.mp3file = None
def onTagChange(self, new_tag):
if new_tag is not None:
self.timer_manager.cancel(self.onTagRemoveDelay)
if new_tag == self.current_tag:
return
# Change playlist on new tag
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
def onNewTag(self, new_tag):
"""
Callback (typically called by TagStateMachine) to signal that a new tag has been presented.
"""
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
if self.tag_mode == 'tagremains' or (self.tag_mode == 'tagstartstop' and new_tag != self.playing_tag):
self._set_playlist(uid_str)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
self.playing_tag = new_tag
elif self.tag_mode == 'tagstartstop':
print('Tag presented again, stopping playback')
self._unset_playlist()
self.playing_tag = None
def onTagRemoveDelay(self):
if self.current_tag is not None:
def onTagRemoved(self):
"""
Callback (typically called by TagStateMachine) to signal that a tag has been removed.
"""
if self.tag_mode == 'tagremains':
print('Tag gone, stopping playback')
self.current_tag = None
if self.playlist is not None:
pos = self.player.stop()
if pos is not None:
self.playlist.setPlaybackOffset(pos)
self._unset_playlist()
def onButtonPressed(self, what):
assert self.buttons is not None
@@ -71,10 +96,18 @@ class PlayerApp:
self._play_next()
def _set_playlist(self, tag: bytes):
self._unset_playlist()
self.playlist = self.playlist_db.getPlaylistForTag(tag)
self._play(self.playlist.getCurrentPath() if self.playlist is not None else None,
self.playlist.getPlaybackOffset() if self.playlist is not None else 0)
def _unset_playlist(self):
if self.playlist is not None:
pos = self.player.stop()
if pos is not None:
self.playlist.setPlaybackOffset(pos)
self.playlist = None
def _play_next(self):
if self.playlist is None:
return
@@ -82,6 +115,7 @@ class PlayerApp:
self._play(filename)
if filename is None:
self.playlist = None
self.playing_tag = None
def _play(self, filename: bytes | None, offset=0):
if self.mp3file is not None:

View File

@@ -3,8 +3,10 @@
import aiorepl # type: ignore
import asyncio
from errno import ENOENT
import machine
import micropython
import os
import time
from math import pi, sin, pow
@@ -54,6 +56,9 @@ async def rainbow(np, period=10):
machine.mem32[0x40030000 + 0x00] = 0x10
DB_PATH = '/sd/tonberry.db'
def run():
asyncio.new_event_loop()
# Setup LEDs
@@ -62,27 +67,36 @@ def run():
# Setup MP3 player
with SDContext(mosi=hwconfig.SD_DI, miso=hwconfig.SD_DO, sck=hwconfig.SD_SCK, ss=hwconfig.SD_CS,
baudrate=hwconfig.SD_CLOCKRATE), \
BTreeFileManager('/sd/tonberry.db') as playlistdb, \
AudioContext(hwconfig.I2S_DIN, hwconfig.I2S_DCLK, hwconfig.I2S_LRCLK) as audioctx:
baudrate=hwconfig.SD_CLOCKRATE):
# Temporary hack: build database from folders if no database exists
# Can be removed once playlists can be created via API
try:
_ = os.stat(DB_PATH)
except OSError as ex:
if ex.errno == ENOENT:
print("No playlist DB found, trying to build DB from tag dirs")
builddb()
# Setup NFC
reader = MFRC522(spi_id=hwconfig.RC522_SPIID, sck=hwconfig.RC522_SCK, miso=hwconfig.RC522_MISO,
mosi=hwconfig.RC522_MOSI, cs=hwconfig.RC522_SS, rst=hwconfig.RC522_RST, tocard_retries=20)
with BTreeFileManager(DB_PATH) as playlistdb, \
AudioContext(hwconfig.I2S_DIN, hwconfig.I2S_DCLK, hwconfig.I2S_LRCLK) as audioctx:
# Setup app
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
nfcreader=lambda the_app: Nfc(reader, the_app),
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP,
pin_voldown=hwconfig.BUTTON_VOLDOWN,
pin_next=hwconfig.BUTTON_NEXT),
playlistdb=lambda _: playlistdb)
the_app = app.PlayerApp(deps)
# Setup NFC
reader = MFRC522(spi_id=hwconfig.RC522_SPIID, sck=hwconfig.RC522_SCK, miso=hwconfig.RC522_MISO,
mosi=hwconfig.RC522_MOSI, cs=hwconfig.RC522_SS, rst=hwconfig.RC522_RST, tocard_retries=20)
# Start
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
'app': the_app}))
asyncio.get_event_loop().run_forever()
# Setup app
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
nfcreader=lambda the_app: Nfc(reader, the_app),
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP,
pin_voldown=hwconfig.BUTTON_VOLDOWN,
pin_next=hwconfig.BUTTON_NEXT),
playlistdb=lambda _: playlistdb)
the_app = app.PlayerApp(deps)
# Start
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
'app': the_app}))
asyncio.get_event_loop().run_forever()
def builddb():
@@ -90,10 +104,11 @@ def builddb():
For testing, build a playlist db based on the previous tag directory format.
Can be removed once uploading files / playlist via the web api is possible.
"""
import os
os.unlink('/sd/tonberry.db')
with BTreeFileManager('/sd/tonberry.db') as db:
try:
os.unlink(DB_PATH)
except OSError:
pass
with BTreeFileManager(DB_PATH) as db:
for name, type_, _, _ in os.ilistdir(b'/sd'):
if type_ != 0x4000:
continue

View File

@@ -31,6 +31,9 @@ class BTreeDB(IPlaylistDB):
PERSIST_NO = b'no'
PERSIST_TRACK = b'track'
PERSIST_OFFSET = b'offset'
DEFAULT_SETTINGS = {
b'tagmode': b'tagremains'
}
class Playlist(IPlaylist):
def __init__(self, parent: "BTreeDB", tag: bytes, pos: int, persist, shuffle):
@@ -269,6 +272,11 @@ class BTreeDB(IPlaylistDB):
self._savePlaylist(tag, entries, persist, shuffle)
return self.getPlaylistForTag(tag)
def getSetting(self, key: bytes | str) -> str:
if key is str:
key = key.encode()
return self.db.get(b'settings/' + key, self.DEFAULT_SETTINGS[key]).decode()
def validate(self, dump=False):
"""
Validate the structure of the playlist database.
@@ -286,6 +294,11 @@ class BTreeDB(IPlaylistDB):
fields = k.split(b'/')
if len(fields) <= 1:
fail(f'Malformed key {k!r}')
continue
if fields[0] == b'settings':
val = self.db[k].decode()
print(f'Setting {fields[1].decode()} = {val}')
continue
if last_tag != fields[0]:
last_tag = fields[0]
last_pos = None

View File

@@ -38,14 +38,23 @@ class FakeMp3Player:
def play(self, track: FakeFile, offset: int):
self.track = track
def stop(self):
self.track = None
class FakeTimerManager:
def __init__(self): pass
def cancel(self, timer): pass
def schedule(self, when, what):
what()
class FakeNfcReader:
def __init__(self): pass
tag_callback = None
def __init__(self, tag_callback=None):
FakeNfcReader.tag_callback = tag_callback
class FakeButtons:
@@ -76,6 +85,11 @@ class FakePlaylistDb:
def getPlaylistForTag(self, tag: bytes):
return self.FakePlaylist(self)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagremains'
return None
def fake_open(filename, mode):
return FakeFile(filename, mode)
@@ -100,13 +114,13 @@ def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
assert "r" in fake_mp3.track.mode
@@ -117,13 +131,13 @@ def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'track1.mp3'
@@ -147,14 +161,88 @@ def test_playlist_unknown_tag(micropythonify, faketimermanager, monkeypatch):
def getPlaylistForTag(self, tag):
return None
def getSetting(self, key: bytes | str):
return None
fake_db = FakeNoPlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is None
def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch):
class MyFakePlaylistDb(FakePlaylistDb):
def __init__(self, tracklist=[b'test/path.mp3']):
super().__init__(tracklist)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagstartstop'
return None
fake_db = MyFakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
# Present tag to start playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Removing tag should not stop playback
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Presenting tag should stop playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is None
# Nothing should change here
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is None
# Presenting tag again should start playback again
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
def test_tagmode_remains(micropythonify, faketimermanager, monkeypatch):
class MyFakePlaylistDb(FakePlaylistDb):
def __init__(self, tracklist=[b'test/path.mp3']):
super().__init__(tracklist)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagremains'
return None
fake_db = MyFakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
# Present tag to start playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Remove tag to stop playback
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is None
# Presenting tag again should start playback again
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'