Merge pull request 'Add playlist database' (#39) from 23-add-playlist-db into main
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m23s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 11s
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m23s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 11s
Reviewed-on: #39 Reviewed-by: Stefan Kratochwil <kratochwil-la@gmx.de>
This commit was merged in pull request #39.
This commit is contained in:
@@ -2,12 +2,11 @@
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from collections import namedtuple
|
||||
import os
|
||||
import time
|
||||
from utils import TimerManager
|
||||
|
||||
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb'))
|
||||
|
||||
# Should be ~ 6dB steps
|
||||
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
|
||||
@@ -20,6 +19,7 @@ class PlayerApp:
|
||||
self.timer_manager = TimerManager()
|
||||
self.player = deps.mp3player(self)
|
||||
self.nfc = deps.nfcreader(self)
|
||||
self.playlist_db = deps.playlistdb(self)
|
||||
self.buttons = deps.buttons(self) if deps.buttons is not None else None
|
||||
self.mp3file = None
|
||||
self.volume_pos = 3
|
||||
@@ -39,17 +39,8 @@ class PlayerApp:
|
||||
if new_tag is not None:
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.current_tag = new_tag
|
||||
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
|
||||
try:
|
||||
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
|
||||
if name.endswith(b'mp3')]
|
||||
except OSError as ex:
|
||||
print(f'Could not get playlist for tag {uid_str}: {ex}')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
return
|
||||
testfiles.sort()
|
||||
self._set_playlist(testfiles)
|
||||
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
|
||||
self._set_playlist(uid_str)
|
||||
else:
|
||||
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
|
||||
|
||||
@@ -60,6 +51,7 @@ class PlayerApp:
|
||||
self.player.stop()
|
||||
|
||||
def onButtonPressed(self, what):
|
||||
assert self.buttons is not None
|
||||
if what == self.buttons.VOLUP:
|
||||
self.volume_pos = min(self.volume_pos + 1, len(VOLUME_CURVE) - 1)
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
@@ -70,24 +62,29 @@ class PlayerApp:
|
||||
self._play_next()
|
||||
|
||||
def onPlaybackDone(self):
|
||||
assert self.mp3file is not None
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self._play_next()
|
||||
|
||||
def _set_playlist(self, files: list[bytes]):
|
||||
self.playlist_pos = 0
|
||||
self.playlist = files
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
def _set_playlist(self, tag: bytes):
|
||||
self.playlist = self.playlist_db.getPlaylistForTag(tag)
|
||||
self._play(self.playlist.getCurrentPath() if self.playlist is not None else None)
|
||||
|
||||
def _play_next(self):
|
||||
if self.playlist_pos + 1 < len(self.playlist):
|
||||
self.playlist_pos += 1
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
if self.playlist is None:
|
||||
return
|
||||
filename = self.playlist.getNextPath()
|
||||
self._play(filename)
|
||||
if filename is None:
|
||||
self.playlist = None
|
||||
|
||||
def _play(self, filename: bytes):
|
||||
def _play(self, filename: bytes | None):
|
||||
if self.mp3file is not None:
|
||||
self.player.stop()
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
if filename is not None:
|
||||
print(f'Playing {filename!r}')
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
|
||||
@@ -15,7 +15,7 @@ from mfrc522 import MFRC522
|
||||
from mp3player import MP3Player
|
||||
from nfc import Nfc
|
||||
from rp2_neopixel import NeoPixel
|
||||
from utils import Buttons, SDContext, TimerManager
|
||||
from utils import BTreeFileManager, Buttons, SDContext, TimerManager
|
||||
|
||||
try:
|
||||
import hwconfig
|
||||
@@ -63,6 +63,7 @@ def run():
|
||||
# Setup MP3 player
|
||||
with SDContext(mosi=hwconfig.SD_DI, miso=hwconfig.SD_DO, sck=hwconfig.SD_SCK, ss=hwconfig.SD_CS,
|
||||
baudrate=15000000), \
|
||||
BTreeFileManager('/sd/tonberry.db') as playlistdb, \
|
||||
AudioContext(hwconfig.I2S_DIN, hwconfig.I2S_DCLK, hwconfig.I2S_LRCLK) as audioctx:
|
||||
|
||||
# Setup NFC
|
||||
@@ -74,7 +75,8 @@ def run():
|
||||
nfcreader=lambda the_app: Nfc(reader, the_app),
|
||||
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP,
|
||||
pin_voldown=hwconfig.BUTTON_VOLDOWN,
|
||||
pin_next=hwconfig.BUTTON_NEXT))
|
||||
pin_next=hwconfig.BUTTON_NEXT),
|
||||
playlistdb=lambda _: playlistdb)
|
||||
the_app = app.PlayerApp(deps)
|
||||
|
||||
# Start
|
||||
@@ -83,7 +85,24 @@ def run():
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
|
||||
def builddb():
|
||||
"""
|
||||
For testing, build a playlist db based on the previous tag directory format.
|
||||
Can be removed once uploading files / playlist via the web api is possible.
|
||||
"""
|
||||
import os
|
||||
|
||||
os.unlink('/sd/tonberry.db')
|
||||
with BTreeFileManager('/sd/tonberry.db') as db:
|
||||
for name, type_, _, _ in os.ilistdir(b'/sd'):
|
||||
if type_ != 0x4000:
|
||||
continue
|
||||
fl = [b'/sd/' + name + b'/' + x for x in os.listdir(b'/sd/' + name) if x.endswith(b'.mp3')]
|
||||
db.createPlaylistForTag(name, fl)
|
||||
os.sync()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if machine.Pin(17, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
|
||||
time.sleep(1)
|
||||
time.sleep(1)
|
||||
if machine.Pin(hwconfig.BUTTON_VOLUP, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
|
||||
run()
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
from utils.buttons import Buttons
|
||||
from utils.mbrpartition import MBRPartition
|
||||
from utils.pinindex import get_pin_index
|
||||
from utils.playlistdb import BTreeDB, BTreeFileManager
|
||||
from utils.sdcontext import SDContext
|
||||
from utils.timer import TimerManager
|
||||
|
||||
__all__ = ["Buttons", "get_pin_index", "MBRPartition", "SDContext", "TimerManager"]
|
||||
__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "get_pin_index", "MBRPartition", "SDContext", "TimerManager"]
|
||||
|
||||
237
software/src/utils/playlistdb.py
Normal file
237
software/src/utils/playlistdb.py
Normal file
@@ -0,0 +1,237 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import btree
|
||||
try:
|
||||
import typing
|
||||
from typing import TYPE_CHECKING, Iterable # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
class IPlaylist(typing.Protocol):
|
||||
def getPaths(self) -> Iterable[bytes]: ...
|
||||
def getCurrentPath(self) -> bytes: ...
|
||||
def getNextPath(self) -> bytes | None: ...
|
||||
|
||||
class IPlaylistDB(typing.Protocol):
|
||||
def getPlaylistForTag(self, tag: bytes) -> IPlaylist: ...
|
||||
else:
|
||||
class IPlaylistDB(object):
|
||||
...
|
||||
|
||||
class IPlaylist(object):
|
||||
...
|
||||
|
||||
|
||||
class BTreeDB(IPlaylistDB):
|
||||
class Playlist(IPlaylist):
|
||||
def __init__(self, parent, tag, pos):
|
||||
self.parent = parent
|
||||
self.tag = tag
|
||||
self.pos = pos
|
||||
|
||||
def getPaths(self):
|
||||
"""
|
||||
Get entire playlist in storage order
|
||||
"""
|
||||
return self.parent._getPlaylistValueIterator(self.tag)
|
||||
|
||||
def getCurrentPath(self):
|
||||
"""
|
||||
Get path of file that should be played.
|
||||
"""
|
||||
return self.parent._getPlaylistEntry(self.tag, self.pos)
|
||||
|
||||
def getNextPath(self):
|
||||
"""
|
||||
Select next track and return path.
|
||||
"""
|
||||
try:
|
||||
self.pos = self.parent._getNextTrack(self.tag, self.pos)
|
||||
except StopIteration:
|
||||
self.pos = self.parent._getFirstTrack(self.tag)
|
||||
return None
|
||||
finally:
|
||||
self.parent._setPlaylistPos(self.tag, self.pos)
|
||||
return self.getCurrentPath()
|
||||
|
||||
def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None):
|
||||
self.db = db
|
||||
self.flush_func = flush_func
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPos(tag):
|
||||
return b''.join([tag, b'/playlistpos'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistEntry(tag, pos):
|
||||
return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStart(tag):
|
||||
return b''.join([tag, b'/playlist/'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStartEnd(tag):
|
||||
return (b''.join([tag, b'/playlist/']),
|
||||
b''.join([tag, b'/playlist0']))
|
||||
|
||||
def _flush(self):
|
||||
"""
|
||||
Flush the database and call the flush_func if it was provided.
|
||||
"""
|
||||
self.db.flush()
|
||||
if self.flush_func is not None:
|
||||
self.flush_func()
|
||||
|
||||
def _getPlaylistValueIterator(self, tag):
|
||||
start, end = self._keyPlaylistStartEnd(tag)
|
||||
return self.db.values(start, end)
|
||||
|
||||
def _getPlaylistEntry(self, _, pos):
|
||||
return self.db[pos]
|
||||
|
||||
def _setPlaylistPos(self, tag, pos, flush=True):
|
||||
assert pos.startswith(self._keyPlaylistStart(tag))
|
||||
self.db[self._keyPlaylistPos(tag)] = pos[len(self._keyPlaylistStart(tag)):]
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _savePlaylist(self, tag, entries, flush=True):
|
||||
self._deletePlaylist(tag, False)
|
||||
for idx, entry in enumerate(entries):
|
||||
self.db[self._keyPlaylistEntry(tag, idx)] = entry
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _deletePlaylist(self, tag, flush=True):
|
||||
start_key, end_key = self._keyPlaylistStartEnd(tag)
|
||||
for k in self.db.keys(start_key, end_key):
|
||||
try:
|
||||
del self.db[k]
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
del self.db[self._keyPlaylistPos(tag)]
|
||||
except KeyError:
|
||||
pass
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _getFirstTrack(self, tag: bytes):
|
||||
start_key, end_key = self._keyPlaylistStartEnd(tag)
|
||||
return next(self.db.keys(start_key, end_key))
|
||||
|
||||
def _getNextTrack(self, tag, pos):
|
||||
_, end_key = self._keyPlaylistStartEnd(tag)
|
||||
it = self.db.keys(pos, end_key)
|
||||
next(it)
|
||||
return next(it)
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
"""
|
||||
Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given
|
||||
tag.
|
||||
"""
|
||||
pos = self.db.get(self._keyPlaylistPos(tag))
|
||||
if pos is None:
|
||||
try:
|
||||
pos = self._getFirstTrack(tag)
|
||||
except StopIteration:
|
||||
# playist does not exist
|
||||
return None
|
||||
else:
|
||||
pos = self._keyPlaylistStart(tag) + pos
|
||||
return self.Playlist(self, tag, pos)
|
||||
|
||||
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes]):
|
||||
"""
|
||||
Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it
|
||||
is overwritten.
|
||||
"""
|
||||
self._savePlaylist(tag, entries)
|
||||
return self.getPlaylistForTag(tag)
|
||||
|
||||
def validate(self, dump=False):
|
||||
"""
|
||||
Validate the structure of the playlist database.
|
||||
"""
|
||||
result = True
|
||||
last_tag = None
|
||||
last_pos = None
|
||||
index_width = None
|
||||
for k in self.db.keys():
|
||||
fields = k.split(b'/')
|
||||
if len(fields) <= 1:
|
||||
print(f'Malformed key {k!r}')
|
||||
result = False
|
||||
if last_tag != fields[0]:
|
||||
last_tag = fields[0]
|
||||
last_pos = None
|
||||
if dump:
|
||||
print(f'Tag {fields[0]}')
|
||||
if fields[1] == b'playlist':
|
||||
if len(fields) != 3:
|
||||
print(f'Malformed playlist entry: {k!r}')
|
||||
result = False
|
||||
continue
|
||||
try:
|
||||
idx = int(fields[2])
|
||||
except ValueError:
|
||||
print(f'Malformed playlist entry: {k!r}')
|
||||
result = False
|
||||
continue
|
||||
if index_width is not None and len(fields[2]) != index_width:
|
||||
print(f'Inconsistent index width for {last_tag} at {idx}')
|
||||
result = False
|
||||
if (last_pos is not None and last_pos + 1 != idx) or \
|
||||
(last_pos is None and idx != 0):
|
||||
print(f'Bad playlist entry sequence for {last_tag} at {idx}')
|
||||
result = False
|
||||
last_pos = idx
|
||||
index_width = len(fields[2])
|
||||
if dump:
|
||||
print(f'\tTrack {idx}: {self.db[k]!r}')
|
||||
elif fields[1] == b'playlistpos':
|
||||
val = self.db[k]
|
||||
try:
|
||||
idx = int(val)
|
||||
except ValueError:
|
||||
print(f'Malformed playlist position: {val!r}')
|
||||
result = False
|
||||
continue
|
||||
if 0 > idx or idx > last_pos:
|
||||
print(f'Playlist position out of range for {last_tag}: {idx}')
|
||||
result = False
|
||||
if dump:
|
||||
print(f'\tPosition {idx}')
|
||||
else:
|
||||
print(f'Unknown key {k!r}')
|
||||
result = False
|
||||
return result
|
||||
|
||||
|
||||
class BTreeFileManager:
|
||||
"""
|
||||
Context manager for a BTreeDB playlist db backed by a file in the filesystem.
|
||||
"""
|
||||
def __init__(self, db_path: str | bytes):
|
||||
self.db_path = db_path
|
||||
|
||||
def __enter__(self):
|
||||
try:
|
||||
self.db_file = open(self.db_path, 'r+b')
|
||||
except OSError:
|
||||
self.db_file = open(self.db_path, 'w+b')
|
||||
try:
|
||||
self.db = btree.open(self.db_file, pagesize=512, cachesize=1024)
|
||||
btdb = BTreeDB(self.db, lambda: self.db_file.flush())
|
||||
btdb.validate(True) # while testing, validate and dump DB on startup
|
||||
return btdb
|
||||
except Exception:
|
||||
self.db_file.close()
|
||||
raise
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.db.close()
|
||||
self.db_file.close()
|
||||
21
software/tests/mocks/btree.py
Normal file
21
software/tests/mocks/btree.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class BTree:
|
||||
def close(self): ...
|
||||
|
||||
def values(self, start_key: str | bytes, end_key: str | bytes | None = None, flags=None) -> Iterable[str | bytes]:
|
||||
pass
|
||||
|
||||
def __setitem__(self, key: str | bytes, val: str | bytes): ...
|
||||
|
||||
def flush(self): ...
|
||||
|
||||
def get(self, key: str | bytes, default: str | bytes | None = None) -> str | bytes: ...
|
||||
|
||||
|
||||
def open(dbfile) -> BTree:
|
||||
pass
|
||||
157
software/tests/test_playerapp.py
Normal file
157
software/tests/test_playerapp.py
Normal file
@@ -0,0 +1,157 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import app
|
||||
import builtins
|
||||
import pytest # type: ignore
|
||||
import time
|
||||
import utils
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def micropythonify():
|
||||
def time_ticks_ms():
|
||||
return time.time_ns() // 1000000
|
||||
time.ticks_ms = time_ticks_ms
|
||||
yield
|
||||
del time.ticks_ms
|
||||
|
||||
|
||||
class FakeFile:
|
||||
def __init__(self, filename, mode):
|
||||
self.filename = filename
|
||||
self.mode = mode
|
||||
self.closed = False
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class FakeMp3Player:
|
||||
def __init__(self):
|
||||
self.volume: int | None = None
|
||||
self.track: FakeFile | None = None
|
||||
|
||||
def set_volume(self, vol: int):
|
||||
self.volume = vol
|
||||
|
||||
def play(self, track: FakeFile):
|
||||
self.track = track
|
||||
|
||||
|
||||
class FakeTimerManager:
|
||||
def __init__(self): pass
|
||||
def cancel(self, timer): pass
|
||||
|
||||
|
||||
class FakeNfcReader:
|
||||
def __init__(self): pass
|
||||
|
||||
|
||||
class FakeButtons:
|
||||
def __init__(self): pass
|
||||
|
||||
|
||||
class FakePlaylistDb:
|
||||
class FakePlaylist:
|
||||
def __init__(self, parent, pos=0):
|
||||
self.parent = parent
|
||||
self.pos = 0
|
||||
|
||||
def getCurrentPath(self):
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def getNextPath(self):
|
||||
self.pos += 1
|
||||
if self.pos >= len(self.parent.tracklist):
|
||||
return None
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def __init__(self, tracklist=[b'test/path.mp3']):
|
||||
self.tracklist = tracklist
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
return self.FakePlaylist(self)
|
||||
|
||||
|
||||
def fake_open(filename, mode):
|
||||
return FakeFile(filename, mode)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def faketimermanager(monkeypatch):
|
||||
monkeypatch.setattr(utils.timer.TimerManager, '_instance', FakeTimerManager())
|
||||
|
||||
|
||||
def test_construct_app(micropythonify, faketimermanager):
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: FakePlaylistDb())
|
||||
_ = app.PlayerApp(deps)
|
||||
assert fake_mp3.volume is not None
|
||||
|
||||
|
||||
def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
dut.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
assert "r" in fake_mp3.track.mode
|
||||
assert "b" in fake_mp3.track.mode
|
||||
|
||||
|
||||
def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
dut.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track1.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track2.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track3.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is None
|
||||
|
||||
|
||||
def test_playlist_unknown_tag(micropythonify, faketimermanager, monkeypatch):
|
||||
class FakeNoPlaylistDb:
|
||||
def getPlaylistForTag(self, tag):
|
||||
return None
|
||||
|
||||
fake_db = FakeNoPlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
dut.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is None
|
||||
126
software/tests/utils_test/test_btreedb.py
Normal file
126
software/tests/utils_test/test_btreedb.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from utils import BTreeDB
|
||||
|
||||
|
||||
class FakeDB:
|
||||
def __init__(self, contents):
|
||||
self.contents = contents
|
||||
self.saved_contents = dict(contents)
|
||||
|
||||
def flush(self):
|
||||
self.saved_contents = dict(self.contents)
|
||||
|
||||
def values(self, start_key=None, end_key=None, flags=None):
|
||||
res = []
|
||||
for key in sorted(self.contents):
|
||||
if start_key is not None and start_key > key:
|
||||
continue
|
||||
if end_key is not None and end_key < key:
|
||||
break
|
||||
yield self.contents[key]
|
||||
res.append(self.contents[key])
|
||||
|
||||
def keys(self, start_key=None, end_key=None, flags=None):
|
||||
for key in sorted(self.contents):
|
||||
if start_key is not None and start_key > key:
|
||||
continue
|
||||
if end_key is not None and end_key < key:
|
||||
break
|
||||
yield key
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.contents.get(key, default)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.contents[key]
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
self.contents[key] = val
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.contents[key]
|
||||
|
||||
|
||||
def test_playlist_load():
|
||||
contents = {b'foo/part': b'no',
|
||||
b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
}
|
||||
uut = BTreeDB(FakeDB(contents))
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert list(pl.getPaths()) == [b'track1', b'track2']
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
|
||||
|
||||
def test_playlist_nextpath():
|
||||
contents = FakeDB({b'foo/part': b'no',
|
||||
b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() == b'track2'
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'1'
|
||||
|
||||
|
||||
def test_playlist_nextpath_last():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() is None
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'0'
|
||||
|
||||
|
||||
def test_playlist_create():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3']
|
||||
uut = BTreeDB(contents)
|
||||
new_pl = uut.createPlaylistForTag(b'foo', newplaylist)
|
||||
assert list(new_pl.getPaths()) == newplaylist
|
||||
assert new_pl.getCurrentPath() == newplaylist[0]
|
||||
assert uut.validate(True)
|
||||
|
||||
|
||||
def test_playlist_load_notexist():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
assert uut.getPlaylistForTag(b'notfound') is None
|
||||
|
||||
|
||||
def test_playlist_remains_lexicographically_ordered_by_key():
|
||||
contents = FakeDB({b'foo/playlist/3': b'track3',
|
||||
b'foo/playlist/2': b'track2',
|
||||
b'foo/playlist/1': b'track1',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
assert pl.getNextPath() == b'track2'
|
||||
assert pl.getNextPath() == b'track3'
|
||||
|
||||
|
||||
def test_playlist_remains_lexicographically_ordered_with_non_numeric_keys():
|
||||
contents = FakeDB({b'foo/playlist/k': b'trackk',
|
||||
b'foo/playlist/l': b'trackl',
|
||||
b'foo/playlist/i': b'tracki',
|
||||
b'foo/playlistpos': b'k'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'trackk'
|
||||
assert pl.getNextPath() == b'trackl'
|
||||
assert pl.getNextPath() is None
|
||||
Reference in New Issue
Block a user