Merge pull request '28-configurable-tag-handling' (#48) from 28-configurable-tag-handling into main
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m21s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 11s

Reviewed-on: #48
Reviewed-by: Stefan Kratochwil <kratochwil-la@gmx.de>
This commit was merged in pull request #48.
This commit is contained in:
2025-11-04 18:54:54 +00:00
3 changed files with 166 additions and 31 deletions

View File

@@ -13,13 +13,40 @@ VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
class PlayerApp:
class TagStateMachine:
def __init__(self, parent, timer_manager):
self.parent = parent
self.timer_manager = timer_manager
self.current_tag = None
self.current_tag_time = time.ticks_ms()
def onTagChange(self, new_tag):
if new_tag is not None:
self.timer_manager.cancel(self.onTagRemoveDelay)
if new_tag == self.current_tag:
return
# Change playlist on new tag
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
self.parent.onNewTag(new_tag)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
def onTagRemoveDelay(self):
if self.current_tag is not None:
self.current_tag = None
self.parent.onTagRemoved()
def __init__(self, deps: Dependencies):
self.current_tag = None
self.current_tag_time = time.ticks_ms()
self.timer_manager = TimerManager()
self.tag_state_machine = self.TagStateMachine(self, self.timer_manager)
self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self)
self.nfc = deps.nfcreader(self.tag_state_machine)
self.playlist_db = deps.playlistdb(self)
self.tag_mode = self.playlist_db.getSetting('tagmode')
self.playing_tag = None
self.playlist = None
self.buttons = deps.buttons(self) if deps.buttons is not None else None
self.mp3file = None
self.volume_pos = 3
@@ -30,28 +57,26 @@ class PlayerApp:
self.mp3file.close()
self.mp3file = None
def onTagChange(self, new_tag):
if new_tag is not None:
self.timer_manager.cancel(self.onTagRemoveDelay)
if new_tag == self.current_tag:
return
# Change playlist on new tag
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
def onNewTag(self, new_tag):
"""
Callback (typically called by TagStateMachine) to signal that a new tag has been presented.
"""
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
if self.tag_mode == 'tagremains' or (self.tag_mode == 'tagstartstop' and new_tag != self.playing_tag):
self._set_playlist(uid_str)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
self.playing_tag = new_tag
elif self.tag_mode == 'tagstartstop':
print('Tag presented again, stopping playback')
self._unset_playlist()
self.playing_tag = None
def onTagRemoveDelay(self):
if self.current_tag is not None:
def onTagRemoved(self):
"""
Callback (typically called by TagStateMachine) to signal that a tag has been removed.
"""
if self.tag_mode == 'tagremains':
print('Tag gone, stopping playback')
self.current_tag = None
if self.playlist is not None:
pos = self.player.stop()
if pos is not None:
self.playlist.setPlaybackOffset(pos)
self._unset_playlist()
def onButtonPressed(self, what):
assert self.buttons is not None
@@ -71,10 +96,18 @@ class PlayerApp:
self._play_next()
def _set_playlist(self, tag: bytes):
self._unset_playlist()
self.playlist = self.playlist_db.getPlaylistForTag(tag)
self._play(self.playlist.getCurrentPath() if self.playlist is not None else None,
self.playlist.getPlaybackOffset() if self.playlist is not None else 0)
def _unset_playlist(self):
if self.playlist is not None:
pos = self.player.stop()
if pos is not None:
self.playlist.setPlaybackOffset(pos)
self.playlist = None
def _play_next(self):
if self.playlist is None:
return
@@ -82,6 +115,7 @@ class PlayerApp:
self._play(filename)
if filename is None:
self.playlist = None
self.playing_tag = None
def _play(self, filename: bytes | None, offset=0):
if self.mp3file is not None:

View File

@@ -31,6 +31,9 @@ class BTreeDB(IPlaylistDB):
PERSIST_NO = b'no'
PERSIST_TRACK = b'track'
PERSIST_OFFSET = b'offset'
DEFAULT_SETTINGS = {
b'tagmode': b'tagremains'
}
class Playlist(IPlaylist):
def __init__(self, parent: "BTreeDB", tag: bytes, pos: int, persist, shuffle):
@@ -269,6 +272,11 @@ class BTreeDB(IPlaylistDB):
self._savePlaylist(tag, entries, persist, shuffle)
return self.getPlaylistForTag(tag)
def getSetting(self, key: bytes | str) -> str:
if key is str:
key = key.encode()
return self.db.get(b'settings/' + key, self.DEFAULT_SETTINGS[key]).decode()
def validate(self, dump=False):
"""
Validate the structure of the playlist database.
@@ -286,6 +294,11 @@ class BTreeDB(IPlaylistDB):
fields = k.split(b'/')
if len(fields) <= 1:
fail(f'Malformed key {k!r}')
continue
if fields[0] == b'settings':
val = self.db[k].decode()
print(f'Setting {fields[1].decode()} = {val}')
continue
if last_tag != fields[0]:
last_tag = fields[0]
last_pos = None

View File

@@ -38,14 +38,23 @@ class FakeMp3Player:
def play(self, track: FakeFile, offset: int):
self.track = track
def stop(self):
self.track = None
class FakeTimerManager:
def __init__(self): pass
def cancel(self, timer): pass
def schedule(self, when, what):
what()
class FakeNfcReader:
def __init__(self): pass
tag_callback = None
def __init__(self, tag_callback=None):
FakeNfcReader.tag_callback = tag_callback
class FakeButtons:
@@ -76,6 +85,11 @@ class FakePlaylistDb:
def getPlaylistForTag(self, tag: bytes):
return self.FakePlaylist(self)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagremains'
return None
def fake_open(filename, mode):
return FakeFile(filename, mode)
@@ -100,13 +114,13 @@ def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
assert "r" in fake_mp3.track.mode
@@ -117,13 +131,13 @@ def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'track1.mp3'
@@ -147,14 +161,88 @@ def test_playlist_unknown_tag(micropythonify, faketimermanager, monkeypatch):
def getPlaylistForTag(self, tag):
return None
def getSetting(self, key: bytes | str):
return None
fake_db = FakeNoPlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda _: FakeNfcReader(),
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
dut = app.PlayerApp(deps)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
dut.onTagChange([23, 42, 1, 2, 3])
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is None
def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch):
class MyFakePlaylistDb(FakePlaylistDb):
def __init__(self, tracklist=[b'test/path.mp3']):
super().__init__(tracklist)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagstartstop'
return None
fake_db = MyFakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
# Present tag to start playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Removing tag should not stop playback
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Presenting tag should stop playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is None
# Nothing should change here
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is None
# Presenting tag again should start playback again
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
def test_tagmode_remains(micropythonify, faketimermanager, monkeypatch):
class MyFakePlaylistDb(FakePlaylistDb):
def __init__(self, tracklist=[b'test/path.mp3']):
super().__init__(tracklist)
def getSetting(self, key: bytes | str):
if key == 'tagmode':
return 'tagremains'
return None
fake_db = MyFakePlaylistDb()
fake_mp3 = FakeMp3Player()
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
nfcreader=lambda x: FakeNfcReader(x),
buttons=lambda _: FakeButtons(),
playlistdb=lambda _: fake_db)
app.PlayerApp(deps)
with monkeypatch.context() as m:
m.setattr(builtins, 'open', fake_open)
# Present tag to start playback
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'
# Remove tag to stop playback
FakeNfcReader.tag_callback.onTagChange(None)
assert fake_mp3.track is None
# Presenting tag again should start playback again
FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3])
assert fake_mp3.track is not None
assert fake_mp3.track.filename == b'test/path.mp3'