playlistdb: Implement shuffle
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m22s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 11s
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m22s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 11s
Initial implementation of shuffle with a naive algorithm. Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import btree
|
||||
import random
|
||||
import time
|
||||
try:
|
||||
import typing
|
||||
from typing import TYPE_CHECKING, Iterable # type: ignore
|
||||
@@ -38,6 +40,39 @@ class BTreeDB(IPlaylistDB):
|
||||
self.persist = persist
|
||||
self.shuffle = shuffle
|
||||
self.length = self.parent._getPlaylistLength(self.tag)
|
||||
self._shuffle()
|
||||
|
||||
def _getPlaylistPos(self):
|
||||
"""
|
||||
Gets the position to pass to parent._getPlaylistEntry etc.
|
||||
"""
|
||||
if self.shuffle == BTreeDB.SHUFFLE_YES:
|
||||
return self.shuffle_order[self.pos]
|
||||
else:
|
||||
return self.pos
|
||||
|
||||
def _shuffle(self, reshuffle=False):
|
||||
if self.shuffle == BTreeDB.SHUFFLE_NO:
|
||||
return
|
||||
|
||||
self.shuffle_seed = None
|
||||
# Try to get seed from DB if persisted
|
||||
if self.persist != BTreeDB.PERSIST_NO and not reshuffle:
|
||||
self.shuffle_seed = self.parent._getPlaylistShuffleSeed(self.tag)
|
||||
if self.shuffle_seed is None:
|
||||
# Either not persisted or could not read from db
|
||||
self.shuffle_seed = time.ticks_cpu()
|
||||
if self.persist != BTreeDB.PERSIST_NO:
|
||||
self.parent._setPlaylistShuffleSeed(self.tag, self.shuffle_seed)
|
||||
# TODO: Find an algorithm for shuffling that does not use O(n) memory for playlist of length n
|
||||
random.seed(self.shuffle_seed)
|
||||
entries = list(range(0, self.length))
|
||||
# We don't have random.shuffle in micropython, so emulate it with random.choice
|
||||
self.shuffle_order = []
|
||||
while len(entries) > 0:
|
||||
chosen = random.choice(entries)
|
||||
self.shuffle_order.append(chosen)
|
||||
entries.remove(chosen)
|
||||
|
||||
def getPaths(self):
|
||||
"""
|
||||
@@ -49,7 +84,7 @@ class BTreeDB(IPlaylistDB):
|
||||
"""
|
||||
Get path of file that should be played.
|
||||
"""
|
||||
return self.parent._getPlaylistEntry(self.tag, self.pos)
|
||||
return self.parent._getPlaylistEntry(self.tag, self._getPlaylistPos())
|
||||
|
||||
def getNextPath(self):
|
||||
"""
|
||||
@@ -60,6 +95,7 @@ class BTreeDB(IPlaylistDB):
|
||||
if self.persist != BTreeDB.PERSIST_NO:
|
||||
self.parent._setPlaylistPos(self.tag, self.pos)
|
||||
self.setPlaybackOffset(0)
|
||||
self._shuffle(True)
|
||||
return None
|
||||
|
||||
self.pos += 1
|
||||
@@ -100,6 +136,10 @@ class BTreeDB(IPlaylistDB):
|
||||
def _keyPlaylistShuffle(tag):
|
||||
return b''.join([tag, b'/playlistshuffle'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistShuffleSeed(tag):
|
||||
return b''.join([tag, b'/playlistshuffleseed'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPersist(tag):
|
||||
return b''.join([tag, b'/playlistpersist'])
|
||||
@@ -145,6 +185,17 @@ class BTreeDB(IPlaylistDB):
|
||||
def _getPlaylistPosOffset(self, tag: bytes) -> int:
|
||||
return int(self.db.get(self._keyPlaylistPosOffset(tag), b'0'))
|
||||
|
||||
def _getPlaylistShuffleSeed(self, tag: bytes) -> int | None:
|
||||
try:
|
||||
return int(self.db[self._keyPlaylistShuffleSeed(tag)])
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
|
||||
def _setPlaylistShuffleSeed(self, tag, seed: int, flush=True):
|
||||
self.db[self._keyPlaylistShuffleSeed(tag)] = str(seed).encode()
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _getPlaylistLength(self, tag: bytes) -> int:
|
||||
start, end = self._keyPlaylistStartEnd(tag)
|
||||
for k in self.db.keys(end, start, btree.DESC):
|
||||
@@ -178,7 +229,8 @@ class BTreeDB(IPlaylistDB):
|
||||
except KeyError:
|
||||
pass
|
||||
for k in (self._keyPlaylistPos(tag), self._keyPlaylistPosOffset(tag),
|
||||
self._keyPlaylistPersist(tag), self._keyPlaylistShuffle(tag)):
|
||||
self._keyPlaylistPersist(tag), self._keyPlaylistShuffle(tag),
|
||||
self._keyPlaylistShuffleSeed(tag)):
|
||||
try:
|
||||
del self.db[k]
|
||||
except KeyError:
|
||||
@@ -271,7 +323,7 @@ class BTreeDB(IPlaylistDB):
|
||||
val = self.db[k]
|
||||
if val not in (b'no', b'yes'):
|
||||
fail(f'Bad playlistshuffle value for {last_tag}: {val!r}')
|
||||
if dump and val == 'yes':
|
||||
if dump and val == b'yes':
|
||||
print('\tShuffle')
|
||||
elif fields[1] == b'playlistpersist':
|
||||
val = self.db[k]
|
||||
@@ -280,8 +332,11 @@ class BTreeDB(IPlaylistDB):
|
||||
elif dump:
|
||||
print(f'\tPersist: {val.decode()}')
|
||||
elif fields[1] == b'playlistshuffleseed':
|
||||
# Format TBD
|
||||
pass
|
||||
val = self.db[k]
|
||||
try:
|
||||
_ = int(val)
|
||||
except ValueError:
|
||||
fail(f' Bad playlistshuffleseed value for {last_tag}: {val!r}')
|
||||
elif fields[1] == b'playlistposoffset':
|
||||
val = self.db[k]
|
||||
try:
|
||||
|
||||
@@ -3,9 +3,19 @@
|
||||
|
||||
import btree
|
||||
import pytest
|
||||
import time
|
||||
from utils import BTreeDB
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def micropythonify():
|
||||
def time_ticks_cpu():
|
||||
return time.time_ns()
|
||||
time.ticks_cpu = time_ticks_cpu
|
||||
yield
|
||||
del time.ticks_cpu
|
||||
|
||||
|
||||
class FakeDB:
|
||||
def __init__(self, contents):
|
||||
self.contents = contents
|
||||
@@ -165,3 +175,23 @@ def test_playlist_resets_offset_on_next_track():
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getCurrentPath() == b'track2'
|
||||
assert pl.getPlaybackOffset() == 0
|
||||
|
||||
|
||||
def test_playlist_shuffle():
|
||||
contents_dict = {b'foo/playlistpersist': b'track',
|
||||
b'foo/playlistshuffle': b'yes',
|
||||
}
|
||||
for i in range(256):
|
||||
contents_dict['foo/playlist/{:05}'.format(i).encode()] = 'track{}'.format(i).encode()
|
||||
contents = FakeDB(contents_dict)
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
shuffled = False
|
||||
last_idx = int(pl.getCurrentPath().removeprefix(b'track'))
|
||||
while (t := pl.getNextPath()) is not None:
|
||||
idx = int(t.removeprefix(b'track'))
|
||||
if idx != last_idx + 1:
|
||||
shuffled = True
|
||||
break
|
||||
# A false negative ratr of 1 in 256! should be good enough for this test
|
||||
assert shuffled
|
||||
|
||||
Reference in New Issue
Block a user