Merge pull request '24-playlist-modes' (#46) from 24-playlist-modes into main
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m23s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 11s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 9s
Run pytests / Check-Pytest (push) Successful in 11s

Reviewed-on: #46
Reviewed-by: Stefan Kratochwil <kratochwil-la@gmx.de>
This commit was merged in pull request #46.
This commit is contained in:
2025-11-04 18:23:57 +00:00
7 changed files with 362 additions and 92 deletions

View File

@@ -9,3 +9,46 @@ python -m venv test-venv
pip install -r tests/requirements.txt pip install -r tests/requirements.txt
pip install -U micropython-rp2-pico_w-stubs --target typings pip install -U micropython-rp2-pico_w-stubs --target typings
``` ```
## 'database' schema for btree db
### Playlist storage
The playlist data is stored in the btree database in a hierarchical schema. The hierarchy levels are
separated by the '/' character. Currently, the schema is as follows: The top level for a playlist
is the 'tag' id encoded as a hexadecimal string. Beneath this, the 'playlist' key contains the
elements in the playlist. The keys used for the playlist entries must be decimal integers,
left-padded with zeros so their length is 5 (e.g. format `{:05}`).
#### Playlist modes
The 'playlistshuffle' key located under the 'tag' key can be 'no' or 'yes' and specifies whether the
playlist is in shuffle mode. Should this key be absent the default value is 'no'.
The 'playlistpersist' key located under the 'tag' key can be 'no', 'track' or 'offset'. Should this
key be absent the default value is 'track'.
* When it is 'no', the playlist position is not saved when playback stops. If shuffle mode is
active, the shuffle random seed is also not saved.
* When it is 'track', the currently playing track is saved when playback stops. If shuffle mode is
active, the shuffle random seed is also saved. Should playback reach the last track (in shuffle
mode: the last track in the permutated order), the saved position is reset and playback is
stopped. The next time the playlist is started it will start from the first track and with a new
shuffle seed if applicable.
* When it is 'offset', the operation is basically the same as in 'track' mode. The difference is
that the offset in the currently playing track is also saved and playback will resume at that
position.
The 'playlistpos' key located under the 'tag' key stores the key of the current playlist
entry. The 'playlistshuffleseed' key stores the random seed used to shuffle the playlist.
The 'playlistposoffset' key stores the offset in the current playlist entry.
#### Example
For example, a playlist with two entries 'a.mp3' and 'b.mp3' for a tag with the id '00aa11bb22'
would be stored in the following key/value pairs in the btree db:
* 00aa11bb22/playlist/00000: a.mp3
* 00aa11bb22/playlist/00001: b.mp3
* 00aa11bb22/playlistpos: 00000

View File

@@ -48,7 +48,10 @@ class PlayerApp:
if self.current_tag is not None: if self.current_tag is not None:
print('Tag gone, stopping playback') print('Tag gone, stopping playback')
self.current_tag = None self.current_tag = None
self.player.stop() if self.playlist is not None:
pos = self.player.stop()
if pos is not None:
self.playlist.setPlaybackOffset(pos)
def onButtonPressed(self, what): def onButtonPressed(self, what):
assert self.buttons is not None assert self.buttons is not None
@@ -69,7 +72,8 @@ class PlayerApp:
def _set_playlist(self, tag: bytes): def _set_playlist(self, tag: bytes):
self.playlist = self.playlist_db.getPlaylistForTag(tag) self.playlist = self.playlist_db.getPlaylistForTag(tag)
self._play(self.playlist.getCurrentPath() if self.playlist is not None else None) self._play(self.playlist.getCurrentPath() if self.playlist is not None else None,
self.playlist.getPlaybackOffset() if self.playlist is not None else 0)
def _play_next(self): def _play_next(self):
if self.playlist is None: if self.playlist is None:
@@ -79,7 +83,7 @@ class PlayerApp:
if filename is None: if filename is None:
self.playlist = None self.playlist = None
def _play(self, filename: bytes | None): def _play(self, filename: bytes | None, offset=0):
if self.mp3file is not None: if self.mp3file is not None:
self.player.stop() self.player.stop()
self.mp3file.close() self.mp3file.close()
@@ -87,4 +91,4 @@ class PlayerApp:
if filename is not None: if filename is not None:
print(f'Playing {filename!r}') print(f'Playing {filename!r}')
self.mp3file = open(filename, 'rb') self.mp3file = open(filename, 'rb')
self.player.play(self.mp3file) self.player.play(self.mp3file, offset)

View File

@@ -21,14 +21,19 @@ class MP3Player:
self.mp3task = None self.mp3task = None
self.volume = 128 self.volume = 128
self.cb = cb self.cb = cb
self.pos = 0
def play(self, stream): def play(self, stream, offset=0):
""" """
Play from byte stream. Play from byte stream.
If offset > 0, discard the first offset bytes
""" """
if self.mp3task is not None: if self.mp3task is not None:
self.mp3task.cancel() self.mp3task.cancel()
self.mp3task = None self.mp3task = None
if offset > 0:
stream.seek(offset, 1)
self.pos = offset
self.mp3task = asyncio.create_task(self._play_task(stream)) self.mp3task = asyncio.create_task(self._play_task(stream))
def stop(self): def stop(self):
@@ -38,6 +43,8 @@ class MP3Player:
if self.mp3task is not None: if self.mp3task is not None:
self.mp3task.cancel() self.mp3task.cancel()
self.mp3task = None self.mp3task = None
return self.pos
return None
def set_volume(self, volume: int): def set_volume(self, volume: int):
""" """
@@ -60,6 +67,7 @@ class MP3Player:
# End of file # End of file
break break
_, _, underruns = await self.audiocore.async_put(data[:bytes_read]) _, _, underruns = await self.audiocore.async_put(data[:bytes_read])
self.pos += bytes_read
if underruns > known_underruns: if underruns > known_underruns:
print(f"{underruns:x}") print(f"{underruns:x}")
known_underruns = underruns known_underruns = underruns

View File

@@ -2,6 +2,8 @@
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org> # Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import btree import btree
import random
import time
try: try:
import typing import typing
from typing import TYPE_CHECKING, Iterable # type: ignore from typing import TYPE_CHECKING, Iterable # type: ignore
@@ -24,11 +26,53 @@ else:
class BTreeDB(IPlaylistDB): class BTreeDB(IPlaylistDB):
SHUFFLE_NO = b'no'
SHUFFLE_YES = b'yes'
PERSIST_NO = b'no'
PERSIST_TRACK = b'track'
PERSIST_OFFSET = b'offset'
class Playlist(IPlaylist): class Playlist(IPlaylist):
def __init__(self, parent, tag, pos): def __init__(self, parent: "BTreeDB", tag: bytes, pos: int, persist, shuffle):
self.parent = parent self.parent = parent
self.tag = tag self.tag = tag
self.pos = pos self.pos = pos
self.persist = persist
self.shuffle = shuffle
self.length = self.parent._getPlaylistLength(self.tag)
self._shuffle()
def _getPlaylistPos(self):
"""
Gets the position to pass to parent._getPlaylistEntry etc.
"""
if self.shuffle == BTreeDB.SHUFFLE_YES:
return self.shuffle_order[self.pos]
else:
return self.pos
def _shuffle(self, reshuffle=False):
if self.shuffle == BTreeDB.SHUFFLE_NO:
return
self.shuffle_seed = None
# Try to get seed from DB if persisted
if self.persist != BTreeDB.PERSIST_NO and not reshuffle:
self.shuffle_seed = self.parent._getPlaylistShuffleSeed(self.tag)
if self.shuffle_seed is None:
# Either not persisted or could not read from db
self.shuffle_seed = time.ticks_cpu()
if self.persist != BTreeDB.PERSIST_NO:
self.parent._setPlaylistShuffleSeed(self.tag, self.shuffle_seed)
# TODO: Find an algorithm for shuffling that does not use O(n) memory for playlist of length n
random.seed(self.shuffle_seed)
entries = list(range(0, self.length))
# We don't have random.shuffle in micropython, so emulate it with random.choice
self.shuffle_order = []
while len(entries) > 0:
chosen = random.choice(entries)
self.shuffle_order.append(chosen)
entries.remove(chosen)
def getPaths(self): def getPaths(self):
""" """
@@ -40,21 +84,42 @@ class BTreeDB(IPlaylistDB):
""" """
Get path of file that should be played. Get path of file that should be played.
""" """
return self.parent._getPlaylistEntry(self.tag, self.pos) return self.parent._getPlaylistEntry(self.tag, self._getPlaylistPos())
def getNextPath(self): def getNextPath(self):
""" """
Select next track and return path. Select next track and return path.
""" """
try: if self.pos + 1 >= self.length:
self.pos = self.parent._getNextTrack(self.tag, self.pos) self.pos = 0
except StopIteration: if self.persist != BTreeDB.PERSIST_NO:
self.pos = self.parent._getFirstTrack(self.tag) self.parent._setPlaylistPos(self.tag, self.pos)
self.setPlaybackOffset(0)
self._shuffle(True)
return None return None
finally:
self.pos += 1
if self.persist != BTreeDB.PERSIST_NO:
self.parent._setPlaylistPos(self.tag, self.pos) self.parent._setPlaylistPos(self.tag, self.pos)
self.setPlaybackOffset(0)
return self.getCurrentPath() return self.getCurrentPath()
def setPlaybackOffset(self, offset):
"""
Store the current position in the track for PERSIST_OFFSET mode
"""
if self.persist != BTreeDB.PERSIST_OFFSET:
return
self.parent._setPlaylistPosOffset(self.tag, offset)
def getPlaybackOffset(self):
"""
Get the current position in the track for PERSIST_OFFSET mode
"""
if self.persist != BTreeDB.PERSIST_OFFSET:
return 0
return self.parent._getPlaylistPosOffset(self.tag)
def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None): def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None):
self.db = db self.db = db
self.flush_func = flush_func self.flush_func = flush_func
@@ -63,6 +128,22 @@ class BTreeDB(IPlaylistDB):
def _keyPlaylistPos(tag): def _keyPlaylistPos(tag):
return b''.join([tag, b'/playlistpos']) return b''.join([tag, b'/playlistpos'])
@staticmethod
def _keyPlaylistPosOffset(tag):
return b''.join([tag, b'/playlistposoffset'])
@staticmethod
def _keyPlaylistShuffle(tag):
return b''.join([tag, b'/playlistshuffle'])
@staticmethod
def _keyPlaylistShuffleSeed(tag):
return b''.join([tag, b'/playlistshuffleseed'])
@staticmethod
def _keyPlaylistPersist(tag):
return b''.join([tag, b'/playlistpersist'])
@staticmethod @staticmethod
def _keyPlaylistEntry(tag, pos): def _keyPlaylistEntry(tag, pos):
return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()]) return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()])
@@ -84,23 +165,59 @@ class BTreeDB(IPlaylistDB):
if self.flush_func is not None: if self.flush_func is not None:
self.flush_func() self.flush_func()
def _getPlaylistValueIterator(self, tag): def _getPlaylistValueIterator(self, tag: bytes):
start, end = self._keyPlaylistStartEnd(tag) start, end = self._keyPlaylistStartEnd(tag)
return self.db.values(start, end) return self.db.values(start, end)
def _getPlaylistEntry(self, _, pos): def _getPlaylistEntry(self, tag: bytes, pos: int) -> bytes:
return self.db[pos] return self.db[self._keyPlaylistEntry(tag, pos)]
def _setPlaylistPos(self, tag, pos, flush=True): def _setPlaylistPos(self, tag: bytes, pos: int, flush=True):
assert pos.startswith(self._keyPlaylistStart(tag)) self.db[self._keyPlaylistPos(tag)] = str(pos).encode()
self.db[self._keyPlaylistPos(tag)] = pos[len(self._keyPlaylistStart(tag)):]
if flush: if flush:
self._flush() self._flush()
def _savePlaylist(self, tag, entries, flush=True): def _setPlaylistPosOffset(self, tag: bytes, offset: int, flush=True):
self.db[self._keyPlaylistPosOffset(tag)] = str(offset).encode()
if flush:
self._flush()
def _getPlaylistPosOffset(self, tag: bytes) -> int:
return int(self.db.get(self._keyPlaylistPosOffset(tag), b'0'))
def _getPlaylistShuffleSeed(self, tag: bytes) -> int | None:
try:
return int(self.db[self._keyPlaylistShuffleSeed(tag)])
except (ValueError, KeyError):
return None
def _setPlaylistShuffleSeed(self, tag, seed: int, flush=True):
self.db[self._keyPlaylistShuffleSeed(tag)] = str(seed).encode()
if flush:
self._flush()
def _getPlaylistLength(self, tag: bytes) -> int:
start, end = self._keyPlaylistStartEnd(tag)
for k in self.db.keys(end, start, btree.DESC):
# There is a bug in btreedb that causes an additional key after 'end' to be returned when iterating in
# descending order
# Check for this and skip it if needed
elements = k.split(b'/')
if len(elements) >= 2 and elements[1] == b'playlist':
last = k
break
print(last)
elements = last.split(b'/')
if len(elements) != 3:
raise RuntimeError("Malformed playlist key")
return int(elements[2])+1
def _savePlaylist(self, tag, entries, persist, shuffle, flush=True):
self._deletePlaylist(tag, False) self._deletePlaylist(tag, False)
for idx, entry in enumerate(entries): for idx, entry in enumerate(entries):
self.db[self._keyPlaylistEntry(tag, idx)] = entry self.db[self._keyPlaylistEntry(tag, idx)] = entry
self.db[self._keyPlaylistPersist(tag)] = persist
self.db[self._keyPlaylistShuffle(tag)] = shuffle
if flush: if flush:
self._flush() self._flush()
@@ -111,45 +228,45 @@ class BTreeDB(IPlaylistDB):
del self.db[k] del self.db[k]
except KeyError: except KeyError:
pass pass
try: for k in (self._keyPlaylistPos(tag), self._keyPlaylistPosOffset(tag),
del self.db[self._keyPlaylistPos(tag)] self._keyPlaylistPersist(tag), self._keyPlaylistShuffle(tag),
except KeyError: self._keyPlaylistShuffleSeed(tag)):
pass try:
del self.db[k]
except KeyError:
pass
if flush: if flush:
self._flush() self._flush()
def _getFirstTrack(self, tag: bytes):
start_key, end_key = self._keyPlaylistStartEnd(tag)
return next(self.db.keys(start_key, end_key))
def _getNextTrack(self, tag, pos):
_, end_key = self._keyPlaylistStartEnd(tag)
it = self.db.keys(pos, end_key)
next(it)
return next(it)
def getPlaylistForTag(self, tag: bytes): def getPlaylistForTag(self, tag: bytes):
""" """
Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given
tag. tag.
""" """
pos = self.db.get(self._keyPlaylistPos(tag)) persist = self.db.get(self._keyPlaylistPersist(tag), self.PERSIST_TRACK)
if pos is None: pos = 0
if persist != self.PERSIST_NO and self._keyPlaylistPos(tag) in self.db:
try: try:
pos = self._getFirstTrack(tag) pos = int(self.db[self._keyPlaylistPos(tag)])
except StopIteration: except ValueError:
# playist does not exist pass
return None if self._keyPlaylistEntry(tag, 0) not in self.db:
else: # Empty playlist
pos = self._keyPlaylistStart(tag) + pos return None
return self.Playlist(self, tag, pos) if self._keyPlaylistEntry(tag, pos) not in self.db:
pos = 0
shuffle = self.db.get(self._keyPlaylistShuffle(tag), self.SHUFFLE_NO)
return self.Playlist(self, tag, pos, persist, shuffle)
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes]): def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes], persist=PERSIST_TRACK,
shuffle=SHUFFLE_NO):
""" """
Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it
is overwritten. is overwritten.
""" """
self._savePlaylist(tag, entries) assert persist in (self.PERSIST_NO, self.PERSIST_TRACK, self.PERSIST_OFFSET)
assert shuffle in (self.SHUFFLE_NO, self.SHUFFLE_YES)
self._savePlaylist(tag, entries, persist, shuffle)
return self.getPlaylistForTag(tag) return self.getPlaylistForTag(tag)
def validate(self, dump=False): def validate(self, dump=False):
@@ -157,14 +274,18 @@ class BTreeDB(IPlaylistDB):
Validate the structure of the playlist database. Validate the structure of the playlist database.
""" """
result = True result = True
def fail(msg):
nonlocal result
print(msg)
result = False
last_tag = None last_tag = None
last_pos = None last_pos = None
index_width = None
for k in self.db.keys(): for k in self.db.keys():
fields = k.split(b'/') fields = k.split(b'/')
if len(fields) <= 1: if len(fields) <= 1:
print(f'Malformed key {k!r}') fail(f'Malformed key {k!r}')
result = False
if last_tag != fields[0]: if last_tag != fields[0]:
last_tag = fields[0] last_tag = fields[0]
last_pos = None last_pos = None
@@ -172,24 +293,19 @@ class BTreeDB(IPlaylistDB):
print(f'Tag {fields[0]}') print(f'Tag {fields[0]}')
if fields[1] == b'playlist': if fields[1] == b'playlist':
if len(fields) != 3: if len(fields) != 3:
print(f'Malformed playlist entry: {k!r}') fail(f'Malformed playlist entry: {k!r}')
result = False
continue continue
try: try:
idx = int(fields[2]) idx = int(fields[2])
except ValueError: except ValueError:
print(f'Malformed playlist entry: {k!r}') fail(f'Malformed playlist entry: {k!r}')
result = False
continue continue
if index_width is not None and len(fields[2]) != index_width: if len(fields[2]) != 5:
print(f'Inconsistent index width for {last_tag} at {idx}') fail(f'Bad index width for {last_tag} at {idx}')
result = False
if (last_pos is not None and last_pos + 1 != idx) or \ if (last_pos is not None and last_pos + 1 != idx) or \
(last_pos is None and idx != 0): (last_pos is None and idx != 0):
print(f'Bad playlist entry sequence for {last_tag} at {idx}') fail(f'Bad playlist entry sequence for {last_tag} at {idx}')
result = False
last_pos = idx last_pos = idx
index_width = len(fields[2])
if dump: if dump:
print(f'\tTrack {idx}: {self.db[k]!r}') print(f'\tTrack {idx}: {self.db[k]!r}')
elif fields[1] == b'playlistpos': elif fields[1] == b'playlistpos':
@@ -197,17 +313,38 @@ class BTreeDB(IPlaylistDB):
try: try:
idx = int(val) idx = int(val)
except ValueError: except ValueError:
print(f'Malformed playlist position: {val!r}') fail(f'Malformed playlist position: {val!r}')
result = False
continue continue
if 0 > idx or idx > last_pos: if 0 > idx or idx > last_pos:
print(f'Playlist position out of range for {last_tag}: {idx}') fail(f'Playlist position out of range for {last_tag}: {idx}')
result = False elif dump:
if dump:
print(f'\tPosition {idx}') print(f'\tPosition {idx}')
elif fields[1] == b'playlistshuffle':
val = self.db[k]
if val not in (b'no', b'yes'):
fail(f'Bad playlistshuffle value for {last_tag}: {val!r}')
if dump and val == b'yes':
print('\tShuffle')
elif fields[1] == b'playlistpersist':
val = self.db[k]
if val not in (b'no', b'track', b'offset'):
fail(f'Bad playlistpersist value for {last_tag}: {val!r}')
elif dump:
print(f'\tPersist: {val.decode()}')
elif fields[1] == b'playlistshuffleseed':
val = self.db[k]
try:
_ = int(val)
except ValueError:
fail(f' Bad playlistshuffleseed value for {last_tag}: {val!r}')
elif fields[1] == b'playlistposoffset':
val = self.db[k]
try:
_ = int(val)
except ValueError:
fail(f' Bad playlistposoffset value for {last_tag}: {val!r}')
else: else:
print(f'Unknown key {k!r}') fail(f'Unknown key {k!r}')
result = False
return result return result

View File

@@ -19,3 +19,7 @@ class BTree:
def open(dbfile) -> BTree: def open(dbfile) -> BTree:
pass pass
DESC = 1
INCL = 2

View File

@@ -35,7 +35,7 @@ class FakeMp3Player:
def set_volume(self, vol: int): def set_volume(self, vol: int):
self.volume = vol self.volume = vol
def play(self, track: FakeFile): def play(self, track: FakeFile, offset: int):
self.track = track self.track = track
@@ -67,6 +67,9 @@ class FakePlaylistDb:
return None return None
return self.parent.tracklist[self.pos] return self.parent.tracklist[self.pos]
def getPlaybackOffset(self):
return 0
def __init__(self, tracklist=[b'test/path.mp3']): def __init__(self, tracklist=[b'test/path.mp3']):
self.tracklist = tracklist self.tracklist = tracklist

View File

@@ -1,9 +1,21 @@
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org> # Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import btree
import pytest
import time
from utils import BTreeDB from utils import BTreeDB
@pytest.fixture(autouse=True)
def micropythonify():
def time_ticks_cpu():
return time.time_ns()
time.ticks_cpu = time_ticks_cpu
yield
del time.ticks_cpu
class FakeDB: class FakeDB:
def __init__(self, contents): def __init__(self, contents):
self.contents = contents self.contents = contents
@@ -17,18 +29,24 @@ class FakeDB:
for key in sorted(self.contents): for key in sorted(self.contents):
if start_key is not None and start_key > key: if start_key is not None and start_key > key:
continue continue
if end_key is not None and end_key < key: if end_key is not None and end_key <= key:
break break
yield self.contents[key] yield self.contents[key]
res.append(self.contents[key]) res.append(self.contents[key])
def keys(self, start_key=None, end_key=None, flags=None): def keys(self, start_key=None, end_key=None, flags=None):
keys = []
if flags is not None and flags & btree.DESC != 0:
start_key, end_key = end_key, start_key
for key in sorted(self.contents): for key in sorted(self.contents):
if start_key is not None and start_key > key: if start_key is not None and start_key > key:
continue continue
if end_key is not None and end_key < key: if end_key is not None and end_key <= key:
break break
yield key keys.append(key)
if flags is not None and flags & btree.DESC != 0:
keys.reverse()
return iter(keys)
def get(self, key, default=None): def get(self, key, default=None):
return self.contents.get(key, default) return self.contents.get(key, default)
@@ -42,11 +60,14 @@ class FakeDB:
def __delitem__(self, key): def __delitem__(self, key):
del self.contents[key] del self.contents[key]
def __contains__(self, key):
return key in self.contents
def test_playlist_load(): def test_playlist_load():
contents = {b'foo/part': b'no', contents = {b'foo/part': b'no',
b'foo/playlist/0': b'track1', b'foo/playlist/00000': b'track1',
b'foo/playlist/1': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlisttt': b'no' b'foo/playlisttt': b'no'
} }
uut = BTreeDB(FakeDB(contents)) uut = BTreeDB(FakeDB(contents))
@@ -57,8 +78,8 @@ def test_playlist_load():
def test_playlist_nextpath(): def test_playlist_nextpath():
contents = FakeDB({b'foo/part': b'no', contents = FakeDB({b'foo/part': b'no',
b'foo/playlist/0': b'track1', b'foo/playlist/00000': b'track1',
b'foo/playlist/1': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlisttt': b'no' b'foo/playlisttt': b'no'
}) })
uut = BTreeDB(contents) uut = BTreeDB(contents)
@@ -68,8 +89,8 @@ def test_playlist_nextpath():
def test_playlist_nextpath_last(): def test_playlist_nextpath_last():
contents = FakeDB({b'foo/playlist/0': b'track1', contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/1': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlistpos': b'1' b'foo/playlistpos': b'1'
}) })
uut = BTreeDB(contents) uut = BTreeDB(contents)
@@ -79,8 +100,8 @@ def test_playlist_nextpath_last():
def test_playlist_create(): def test_playlist_create():
contents = FakeDB({b'foo/playlist/0': b'track1', contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/1': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlistpos': b'1' b'foo/playlistpos': b'1'
}) })
newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3'] newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3']
@@ -92,35 +113,85 @@ def test_playlist_create():
def test_playlist_load_notexist(): def test_playlist_load_notexist():
contents = FakeDB({b'foo/playlist/0': b'track1', contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/1': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlistpos': b'1' b'foo/playlistpos': b'1'
}) })
uut = BTreeDB(contents) uut = BTreeDB(contents)
assert uut.getPlaylistForTag(b'notfound') is None assert uut.getPlaylistForTag(b'notfound') is None
def test_playlist_remains_lexicographically_ordered_by_key(): def test_playlist_starts_at_beginning_in_persist_no_mode():
contents = FakeDB({b'foo/playlist/3': b'track3', contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/2': b'track2', b'foo/playlist/00001': b'track2',
b'foo/playlist/1': b'track1', b'foo/playlistpersist': b'no',
b'foo/playlistpos': b'1'
}) })
uut = BTreeDB(contents) uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo') pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track1' assert pl.getCurrentPath() == b'track1'
assert pl.getNextPath() == b'track2' assert pl.getNextPath() == b'track2'
assert pl.getNextPath() == b'track3' del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track1'
def test_playlist_remains_lexicographically_ordered_with_non_numeric_keys(): @pytest.mark.parametrize("mode", [b'no', b'track'])
contents = FakeDB({b'foo/playlist/k': b'trackk', def test_playlist_ignores_offset_in_other_modes(mode):
b'foo/playlist/l': b'trackl', contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/i': b'tracki', b'foo/playlist/00001': b'track2',
b'foo/playlistpos': b'k' b'foo/playlistpersist': mode,
}) })
uut = BTreeDB(contents) uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo') pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'trackk' pl.setPlaybackOffset(42)
assert pl.getNextPath() == b'trackl' del pl
assert pl.getNextPath() is None pl = uut.getPlaylistForTag(b'foo')
assert pl.getPlaybackOffset() == 0
def test_playlist_stores_offset_in_offset_mode():
contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/00001': b'track2',
b'foo/playlistpersist': b'offset',
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
pl.setPlaybackOffset(42)
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getPlaybackOffset() == 42
def test_playlist_resets_offset_on_next_track():
contents = FakeDB({b'foo/playlist/00000': b'track1',
b'foo/playlist/00001': b'track2',
b'foo/playlistpersist': b'offset',
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
pl.setPlaybackOffset(42)
assert pl.getNextPath() == b'track2'
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track2'
assert pl.getPlaybackOffset() == 0
def test_playlist_shuffle():
contents_dict = {b'foo/playlistpersist': b'track',
b'foo/playlistshuffle': b'yes',
}
for i in range(256):
contents_dict['foo/playlist/{:05}'.format(i).encode()] = 'track{}'.format(i).encode()
contents = FakeDB(contents_dict)
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
shuffled = False
last_idx = int(pl.getCurrentPath().removeprefix(b'track'))
while (t := pl.getNextPath()) is not None:
idx = int(t.removeprefix(b'track'))
if idx != last_idx + 1:
shuffled = True
break
# A false negative ratr of 1 in 256! should be good enough for this test
assert shuffled