playlistdb: Add shuffle and persist settings

Add documentation of playlist database schema in DEVELOP.md.

Add settings for persist and shuffle to BTreeDB. Implement the different
persist modes. Shuffle will be implemented in a followup commit.

Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
This commit is contained in:
2025-09-07 00:14:00 +02:00
parent 4512b91763
commit 5013e2359d
3 changed files with 200 additions and 26 deletions

View File

@@ -24,11 +24,19 @@ else:
class BTreeDB(IPlaylistDB):
SHUFFLE_NO = b'no'
SHUFFLE_YES = b'yes'
PERSIST_NO = b'no'
PERSIST_TRACK = b'track'
PERSIST_OFFSET = b'offset'
class Playlist(IPlaylist):
def __init__(self, parent, tag, pos):
def __init__(self, parent, tag, pos, persist, shuffle):
self.parent = parent
self.tag = tag
self.pos = pos
self.persist = persist
self.shuffle = shuffle
def getPaths(self):
"""
@@ -52,9 +60,27 @@ class BTreeDB(IPlaylistDB):
self.pos = self.parent._getFirstTrack(self.tag)
return None
finally:
self.parent._setPlaylistPos(self.tag, self.pos)
if self.persist != BTreeDB.PERSIST_NO:
self.parent._setPlaylistPos(self.tag, self.pos)
self.setPlaybackOffset(0)
return self.getCurrentPath()
def setPlaybackOffset(self, offset):
"""
Store the current position in the track for PERSIST_OFFSET mode
"""
if self.persist != BTreeDB.PERSIST_OFFSET:
return
self.parent._setPlaylistPosOffset(self.tag, offset)
def getPlaybackOffset(self):
"""
Get the current position in the track for PERSIST_OFFSET mode
"""
if self.persist != BTreeDB.PERSIST_OFFSET:
return 0
return self.parent._getPlaylistPosOffset(self.tag)
def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None):
self.db = db
self.flush_func = flush_func
@@ -63,6 +89,18 @@ class BTreeDB(IPlaylistDB):
def _keyPlaylistPos(tag):
return b''.join([tag, b'/playlistpos'])
@staticmethod
def _keyPlaylistPosOffset(tag):
return b''.join([tag, b'/playlistposoffset'])
@staticmethod
def _keyPlaylistShuffle(tag):
return b''.join([tag, b'/playlistshuffle'])
@staticmethod
def _keyPlaylistPersist(tag):
return b''.join([tag, b'/playlistpersist'])
@staticmethod
def _keyPlaylistEntry(tag, pos):
return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()])
@@ -97,10 +135,20 @@ class BTreeDB(IPlaylistDB):
if flush:
self._flush()
def _savePlaylist(self, tag, entries, flush=True):
def _setPlaylistPosOffset(self, tag, offset, flush=True):
self.db[self._keyPlaylistPosOffset(tag)] = str(offset).encode()
if flush:
self._flush()
def _getPlaylistPosOffset(self, tag):
return int(self.db.get(self._keyPlaylistPosOffset(tag), b'0'))
def _savePlaylist(self, tag, entries, persist, shuffle, flush=True):
self._deletePlaylist(tag, False)
for idx, entry in enumerate(entries):
self.db[self._keyPlaylistEntry(tag, idx)] = entry
self.db[self._keyPlaylistPersist(tag)] = persist
self.db[self._keyPlaylistShuffle(tag)] = shuffle
if flush:
self._flush()
@@ -133,7 +181,11 @@ class BTreeDB(IPlaylistDB):
Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given
tag.
"""
pos = self.db.get(self._keyPlaylistPos(tag))
persist = self.db.get(self._keyPlaylistPersist(tag), self.PERSIST_TRACK)
if persist != self.PERSIST_NO:
pos = self.db.get(self._keyPlaylistPos(tag))
else:
pos = None
if pos is None:
try:
pos = self._getFirstTrack(tag)
@@ -142,14 +194,18 @@ class BTreeDB(IPlaylistDB):
return None
else:
pos = self._keyPlaylistStart(tag) + pos
return self.Playlist(self, tag, pos)
shuffle = self.db.get(self._keyPlaylistShuffle(tag), self.SHUFFLE_NO)
return self.Playlist(self, tag, pos, persist, shuffle)
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes]):
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes], persist=PERSIST_TRACK,
shuffle=SHUFFLE_NO):
"""
Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it
is overwritten.
"""
self._savePlaylist(tag, entries)
assert persist in (self.PERSIST_NO, self.PERSIST_TRACK, self.PERSIST_OFFSET)
assert shuffle in (self.SHUFFLE_NO, self.SHUFFLE_YES)
self._savePlaylist(tag, entries, persist, shuffle)
return self.getPlaylistForTag(tag)
def validate(self, dump=False):
@@ -157,14 +213,19 @@ class BTreeDB(IPlaylistDB):
Validate the structure of the playlist database.
"""
result = True
def fail(msg):
nonlocal result
print(msg)
result = False
last_tag = None
last_pos = None
index_width = None
for k in self.db.keys():
fields = k.split(b'/')
if len(fields) <= 1:
print(f'Malformed key {k!r}')
result = False
fail(f'Malformed key {k!r}')
if last_tag != fields[0]:
last_tag = fields[0]
last_pos = None
@@ -172,22 +233,18 @@ class BTreeDB(IPlaylistDB):
print(f'Tag {fields[0]}')
if fields[1] == b'playlist':
if len(fields) != 3:
print(f'Malformed playlist entry: {k!r}')
result = False
fail(f'Malformed playlist entry: {k!r}')
continue
try:
idx = int(fields[2])
except ValueError:
print(f'Malformed playlist entry: {k!r}')
result = False
fail(f'Malformed playlist entry: {k!r}')
continue
if index_width is not None and len(fields[2]) != index_width:
print(f'Inconsistent index width for {last_tag} at {idx}')
result = False
fail(f'Inconsistent index width for {last_tag} at {idx}')
if (last_pos is not None and last_pos + 1 != idx) or \
(last_pos is None and idx != 0):
print(f'Bad playlist entry sequence for {last_tag} at {idx}')
result = False
fail(f'Bad playlist entry sequence for {last_tag} at {idx}')
last_pos = idx
index_width = len(fields[2])
if dump:
@@ -197,17 +254,32 @@ class BTreeDB(IPlaylistDB):
try:
idx = int(val)
except ValueError:
print(f'Malformed playlist position: {val!r}')
result = False
fail(f'Malformed playlist position: {val!r}')
continue
if 0 > idx or idx > last_pos:
print(f'Playlist position out of range for {last_tag}: {idx}')
result = False
if dump:
fail(f'Playlist position out of range for {last_tag}: {idx}')
elif dump:
print(f'\tPosition {idx}')
elif fields[1] == b'playlistshuffle':
val = self.db[k]
if val not in (b'no', b'yes'):
fail(f'Bad playlistshuffle value for {last_tag}: {val!r}')
if dump and val == 'yes':
print('\tShuffle')
elif fields[1] == b'playlistpersist':
val = self.db[k]
if val not in (b'no', b'track', b'offset'):
fail(f'Bad playlistpersist value for {last_tag}: {val!r}')
elif dump:
print(f'\tPersist: {val.decode()}')
elif fields[1] == b'playlistshuffleseed':
# Format TBD
pass
elif fields[1] == b'playlistposoffset':
# Format TBD
pass
else:
print(f'Unknown key {k!r}')
result = False
fail(f'Unknown key {k!r}')
return result

View File

@@ -1,6 +1,7 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import pytest
from utils import BTreeDB
@@ -17,7 +18,7 @@ class FakeDB:
for key in sorted(self.contents):
if start_key is not None and start_key > key:
continue
if end_key is not None and end_key < key:
if end_key is not None and end_key <= key:
break
yield self.contents[key]
res.append(self.contents[key])
@@ -26,7 +27,7 @@ class FakeDB:
for key in sorted(self.contents):
if start_key is not None and start_key > key:
continue
if end_key is not None and end_key < key:
if end_key is not None and end_key <= key:
break
yield key
@@ -124,3 +125,59 @@ def test_playlist_remains_lexicographically_ordered_with_non_numeric_keys():
assert pl.getCurrentPath() == b'trackk'
assert pl.getNextPath() == b'trackl'
assert pl.getNextPath() is None
def test_playlist_starts_at_beginning_in_persist_no_mode():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpersist': b'no',
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track1'
assert pl.getNextPath() == b'track2'
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track1'
@pytest.mark.parametrize("mode", [b'no', b'track'])
def test_playlist_ignores_offset_in_other_modes(mode):
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpersist': mode,
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
pl.setPlaybackOffset(42)
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getPlaybackOffset() == 0
def test_playlist_stores_offset_in_offset_mode():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpersist': b'offset',
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
pl.setPlaybackOffset(42)
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getPlaybackOffset() == 42
def test_playlist_resets_offset_on_next_track():
contents = FakeDB({b'foo/playlist/0': b'track1',
b'foo/playlist/1': b'track2',
b'foo/playlistpersist': b'offset',
})
uut = BTreeDB(contents)
pl = uut.getPlaylistForTag(b'foo')
pl.setPlaybackOffset(42)
assert pl.getNextPath() == b'track2'
del pl
pl = uut.getPlaylistForTag(b'foo')
assert pl.getCurrentPath() == b'track2'
assert pl.getPlaybackOffset() == 0