Add playlist database
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m22s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 9s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 10s
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m22s
Check code formatting / Check-C-Format (push) Successful in 7s
Check code formatting / Check-Python-Flake8 (push) Successful in 9s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 5s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Run pytests / Check-Pytest (push) Successful in 10s
Add playlist database based on the micropython 'btree' module. Supported features are: * Create playlist * Load playlist * Store position in playlist Different playlist modes will be added in a followup for #24. Implements #23. The playlist data is stored in the btree database in a hierarchical schema. The hierarchy levels are separated by the '/' character. Currently, the schema is as follows: The top level for a playlist is the 'tag' id encoded as a hexadecimal string. Beneath this, the 'playlist' key contains the elements in the playlist. The exact keys used for the playlist entries are not specified, they are enumerated in native sort order to build the playlist. When writing a playlist using the playlistdb module, the keys are 000, 001, etc. by default. The 'playlistpos' key is also located under the 'tag' key and stores the key of the current playlist entry. For example, a playlist with two entries 'a.mp3' and 'b.mp3' for a tag with the id '00aa11bb22' would be stored in the following key/value pairs in the btree db: - 00aa11bb22/playlist/000: a.mp3 - 00aa11bb22/playlist/001: b.mp3 - 00aa11bb22/playlistpos: 000 Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
This commit is contained in:
@@ -2,12 +2,11 @@
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from collections import namedtuple
|
||||
import os
|
||||
import time
|
||||
from utils import TimerManager
|
||||
|
||||
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb'))
|
||||
|
||||
# Should be ~ 6dB steps
|
||||
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
|
||||
@@ -20,6 +19,7 @@ class PlayerApp:
|
||||
self.timer_manager = TimerManager()
|
||||
self.player = deps.mp3player(self)
|
||||
self.nfc = deps.nfcreader(self)
|
||||
self.playlist_db = deps.playlistdb(self)
|
||||
self.buttons = deps.buttons(self) if deps.buttons is not None else None
|
||||
self.mp3file = None
|
||||
self.volume_pos = 3
|
||||
@@ -39,17 +39,8 @@ class PlayerApp:
|
||||
if new_tag is not None:
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.current_tag = new_tag
|
||||
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
|
||||
try:
|
||||
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
|
||||
if name.endswith(b'mp3')]
|
||||
except OSError as ex:
|
||||
print(f'Could not get playlist for tag {uid_str}: {ex}')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
return
|
||||
testfiles.sort()
|
||||
self._set_playlist(testfiles)
|
||||
uid_str = b''.join('{:02x}'.format(x).encode() for x in new_tag)
|
||||
self._set_playlist(uid_str)
|
||||
else:
|
||||
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
|
||||
|
||||
@@ -60,6 +51,7 @@ class PlayerApp:
|
||||
self.player.stop()
|
||||
|
||||
def onButtonPressed(self, what):
|
||||
assert self.buttons is not None
|
||||
if what == self.buttons.VOLUP:
|
||||
self.volume_pos = min(self.volume_pos + 1, len(VOLUME_CURVE) - 1)
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
@@ -70,24 +62,29 @@ class PlayerApp:
|
||||
self._play_next()
|
||||
|
||||
def onPlaybackDone(self):
|
||||
assert self.mp3file is not None
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self._play_next()
|
||||
|
||||
def _set_playlist(self, files: list[bytes]):
|
||||
self.playlist_pos = 0
|
||||
self.playlist = files
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
def _set_playlist(self, tag: bytes):
|
||||
self.playlist = self.playlist_db.getPlaylistForTag(tag)
|
||||
self._play(self.playlist.getCurrentPath())
|
||||
|
||||
def _play_next(self):
|
||||
if self.playlist_pos + 1 < len(self.playlist):
|
||||
self.playlist_pos += 1
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
if self.playlist is None:
|
||||
return
|
||||
filename = self.playlist.getNextPath()
|
||||
self._play(filename)
|
||||
if filename is None:
|
||||
self.playlist = None
|
||||
|
||||
def _play(self, filename: bytes):
|
||||
def _play(self, filename: bytes | None):
|
||||
if self.mp3file is not None:
|
||||
self.player.stop()
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
if filename is not None:
|
||||
print(f'Playing {filename!r}')
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
|
||||
@@ -16,7 +16,7 @@ from mfrc522 import MFRC522
|
||||
from mp3player import MP3Player
|
||||
from nfc import Nfc
|
||||
from rp2_neopixel import NeoPixel
|
||||
from utils import Buttons, SDContext, TimerManager
|
||||
from utils import BTreeFileManager, Buttons, SDContext, TimerManager
|
||||
|
||||
micropython.alloc_emergency_exception_buf(100)
|
||||
|
||||
@@ -61,6 +61,7 @@ def run():
|
||||
|
||||
# Setup MP3 player
|
||||
with SDContext(mosi=Pin(3), miso=Pin(4), sck=Pin(2), ss=Pin(5), baudrate=15000000), \
|
||||
BTreeFileManager('/sd/tonberry.db') as playlistdb, \
|
||||
AudioContext(Pin(8), Pin(6)) as audioctx:
|
||||
|
||||
# Setup NFC
|
||||
@@ -69,7 +70,8 @@ def run():
|
||||
# Setup app
|
||||
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
|
||||
nfcreader=lambda the_app: Nfc(reader, the_app),
|
||||
buttons=lambda the_app: Buttons(the_app))
|
||||
buttons=lambda the_app: Buttons(the_app),
|
||||
playlistdb=lambda _: playlistdb)
|
||||
the_app = app.PlayerApp(deps)
|
||||
|
||||
# Start
|
||||
@@ -78,6 +80,19 @@ def run():
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
|
||||
def builddb():
|
||||
import os
|
||||
|
||||
os.unlink('/sd/tonberry.db')
|
||||
with BTreeFileManager('/sd/tonberry.db') as db:
|
||||
for name, type_, _, _ in os.ilistdir(b'/sd'):
|
||||
if type_ != 0x4000:
|
||||
continue
|
||||
fl = [b'/sd/' + name + b'/' + x for x in os.listdir(b'/sd/' + name) if x.endswith(b'.mp3')]
|
||||
db.createPlaylistForTag(name, fl)
|
||||
os.sync()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if machine.Pin(17, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
|
||||
time.sleep(5)
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
|
||||
from utils.buttons import Buttons
|
||||
from utils.mbrpartition import MBRPartition
|
||||
from utils.playlistdb import BTreeDB, BTreeFileManager
|
||||
from utils.sdcontext import SDContext
|
||||
from utils.timer import TimerManager
|
||||
|
||||
__all__ = ["Buttons", "MBRPartition", "SDContext", "TimerManager"]
|
||||
__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "MBRPartition", "SDContext", "TimerManager"]
|
||||
|
||||
174
software/src/utils/playlistdb.py
Normal file
174
software/src/utils/playlistdb.py
Normal file
@@ -0,0 +1,174 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import btree
|
||||
try:
|
||||
import typing
|
||||
from typing import TYPE_CHECKING, Iterable # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
class IPlaylist(typing.Protocol):
|
||||
def getPaths(self) -> Iterable[bytes]: ...
|
||||
def getCurrentPath(self) -> bytes: ...
|
||||
def getNextPath(self) -> bytes | None: ...
|
||||
|
||||
class IPlaylistDB(typing.Protocol):
|
||||
def getPlaylistForTag(self, tag: bytes) -> IPlaylist: ...
|
||||
else:
|
||||
class IPlaylistDB(object):
|
||||
...
|
||||
|
||||
class IPlaylist(object):
|
||||
...
|
||||
|
||||
|
||||
class BTreeDB(IPlaylistDB):
|
||||
class Playlist(IPlaylist):
|
||||
def __init__(self, parent, tag, pos):
|
||||
self.parent = parent
|
||||
self.tag = tag
|
||||
self.pos = pos
|
||||
|
||||
def getPaths(self):
|
||||
"""
|
||||
Get entire playlist in storage order
|
||||
"""
|
||||
return self.parent._getPlaylistValueIterator(self.tag)
|
||||
|
||||
def getCurrentPath(self):
|
||||
"""
|
||||
Get path of file that should be played.
|
||||
"""
|
||||
return self.parent._getPlaylistEntry(self.tag, self.pos)
|
||||
|
||||
def getNextPath(self):
|
||||
"""
|
||||
Select next track and return path.
|
||||
"""
|
||||
try:
|
||||
self.pos = self.parent._getNextTrack(self.tag, self.pos)
|
||||
except StopIteration:
|
||||
self.pos = self.parent._getFirstTrack(self.tag)
|
||||
return None
|
||||
finally:
|
||||
self.parent._setPlaylistPos(self.tag, self.pos)
|
||||
return self.getCurrentPath()
|
||||
|
||||
def __init__(self, db: btree.BTree, flush_func: typing.Callable | None = None):
|
||||
self.db = db
|
||||
self.flush_func = flush_func
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistPos(tag):
|
||||
return b''.join([tag, b'/playlistpos'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistEntry(tag, pos):
|
||||
return b''.join([tag, b'/playlist/', "{:03}".format(pos).encode()])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStart(tag):
|
||||
return b''.join([tag, b'/playlist/'])
|
||||
|
||||
@staticmethod
|
||||
def _keyPlaylistStartEnd(tag):
|
||||
return (b''.join([tag, b'/playlist/']),
|
||||
b''.join([tag, b'/playlist0']))
|
||||
|
||||
def _flush(self):
|
||||
"""
|
||||
Flush the database and call the flush_func if it was provided.
|
||||
"""
|
||||
self.db.flush()
|
||||
if self.flush_func is not None:
|
||||
self.flush_func()
|
||||
|
||||
def _getPlaylistValueIterator(self, tag):
|
||||
start, end = self._keyPlaylistStartEnd(tag)
|
||||
return self.db.values(start, end)
|
||||
|
||||
def _getPlaylistEntry(self, _, pos):
|
||||
return self.db[pos]
|
||||
|
||||
def _setPlaylistPos(self, tag, pos, flush=True):
|
||||
self.db[self._keyPlaylistPos(tag)] = pos.removeprefix(self._keyPlaylistStart(tag))
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _savePlaylist(self, tag, entries, flush=True):
|
||||
self._deletePlaylist(tag, False)
|
||||
for idx, entry in enumerate(entries):
|
||||
self.db[self._keyPlaylistEntry(tag, idx)] = entry
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _deletePlaylist(self, tag, flush=True):
|
||||
start_key, end_key = self._keyPlaylistStartEnd(tag)
|
||||
for k in self.db.keys(start_key, end_key):
|
||||
try:
|
||||
del self.db[k]
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
del self.db[self._keyPlaylistPos(tag)]
|
||||
except KeyError:
|
||||
pass
|
||||
if flush:
|
||||
self._flush()
|
||||
|
||||
def _getFirstTrack(self, tag: bytes):
|
||||
start_key, end_key = self._keyPlaylistStartEnd(tag)
|
||||
return next(self.db.keys(start_key, end_key))
|
||||
|
||||
def _getNextTrack(self, tag, pos):
|
||||
_, end_key = self._keyPlaylistStartEnd(tag)
|
||||
return next(self.db.keys(pos, end_key))
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
"""
|
||||
Lookup the playlist for 'tag' and return the Playlist object. Return None if no playlist exists for the given
|
||||
tag.
|
||||
"""
|
||||
pos = self.db.get(self._keyPlaylistPos(tag))
|
||||
if pos is None:
|
||||
try:
|
||||
pos = self._getFirstTrack(tag)
|
||||
except StopIteration:
|
||||
# playist does not exist
|
||||
return None
|
||||
else:
|
||||
pos = self._keyPlaylistStart(tag) + pos
|
||||
return self.Playlist(self, tag, pos)
|
||||
|
||||
def createPlaylistForTag(self, tag: bytes, entries: typing.Iterable[bytes]):
|
||||
"""
|
||||
Create and save a playlist for 'tag' and return the Playlist object. If a playlist already existed for 'tag' it
|
||||
is overwritten.
|
||||
"""
|
||||
self._savePlaylist(tag, entries)
|
||||
return self.getPlaylistForTag(tag)
|
||||
|
||||
|
||||
class BTreeFileManager:
|
||||
"""
|
||||
Context manager for a BTreeDB playlist db backed by a file in the filesystem.
|
||||
"""
|
||||
def __init__(self, db_path: str | bytes):
|
||||
self.db_path = db_path
|
||||
|
||||
def __enter__(self):
|
||||
try:
|
||||
self.db_file = open(self.db_path, 'r+b')
|
||||
except OSError:
|
||||
self.db_file = open(self.db_path, 'w+b')
|
||||
try:
|
||||
self.db = btree.open(self.db_file, pagesize=512, cachesize=1024)
|
||||
return BTreeDB(self.db, lambda: self.db_file.flush())
|
||||
except Exception:
|
||||
self.db_file.close()
|
||||
raise
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.db.close()
|
||||
self.db_file.close()
|
||||
21
software/tests/mocks/btree.py
Normal file
21
software/tests/mocks/btree.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
class BTree:
|
||||
def close(self): ...
|
||||
|
||||
def values(self, start_key: str | bytes, end_key: str | bytes | None = None, flags=None) -> Iterable[str | bytes]:
|
||||
pass
|
||||
|
||||
def __setitem__(self, key: str | bytes, val: str | bytes): ...
|
||||
|
||||
def flush(self): ...
|
||||
|
||||
def get(self, key: str | bytes, default: str | bytes | None = None) -> str | bytes: ...
|
||||
|
||||
|
||||
def open(dbfile) -> BTree:
|
||||
pass
|
||||
139
software/tests/test_playerapp.py
Normal file
139
software/tests/test_playerapp.py
Normal file
@@ -0,0 +1,139 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import app
|
||||
import builtins
|
||||
import pytest # type: ignore
|
||||
import time
|
||||
import utils
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def micropythonify():
|
||||
def time_ticks_ms():
|
||||
return time.time_ns() // 1000000
|
||||
time.ticks_ms = time_ticks_ms
|
||||
yield
|
||||
del time.ticks_ms
|
||||
|
||||
|
||||
class FakeFile:
|
||||
def __init__(self, filename, mode):
|
||||
self.filename = filename
|
||||
self.mode = mode
|
||||
self.closed = False
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class FakeMp3Player:
|
||||
def __init__(self):
|
||||
self.volume: int | None = None
|
||||
self.track: FakeFile | None = None
|
||||
|
||||
def set_volume(self, vol: int):
|
||||
self.volume = vol
|
||||
|
||||
def play(self, track: FakeFile):
|
||||
self.track = track
|
||||
|
||||
|
||||
class FakeTimerManager:
|
||||
def __init__(self): pass
|
||||
def cancel(self, timer): pass
|
||||
|
||||
|
||||
class FakeNfcReader:
|
||||
def __init__(self): pass
|
||||
|
||||
|
||||
class FakeButtons:
|
||||
def __init__(self): pass
|
||||
|
||||
|
||||
class FakePlaylistDb:
|
||||
class FakePlaylist:
|
||||
def __init__(self, parent, pos=0):
|
||||
self.parent = parent
|
||||
self.pos = 0
|
||||
|
||||
def getCurrentPath(self):
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def getNextPath(self):
|
||||
self.pos += 1
|
||||
if self.pos >= len(self.parent.tracklist):
|
||||
return None
|
||||
return self.parent.tracklist[self.pos]
|
||||
|
||||
def __init__(self, tracklist=[b'test/path.mp3']):
|
||||
self.tracklist = tracklist
|
||||
|
||||
def getPlaylistForTag(self, tag: bytes):
|
||||
return self.FakePlaylist(self)
|
||||
|
||||
|
||||
def fake_open(filename, mode):
|
||||
return FakeFile(filename, mode)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def faketimermanager(monkeypatch):
|
||||
monkeypatch.setattr(utils.timer.TimerManager, '_instance', FakeTimerManager())
|
||||
|
||||
|
||||
def test_construct_app(micropythonify, faketimermanager):
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: FakePlaylistDb())
|
||||
_ = app.PlayerApp(deps)
|
||||
assert fake_mp3.volume is not None
|
||||
|
||||
|
||||
def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb()
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
dut.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'test/path.mp3'
|
||||
assert "r" in fake_mp3.track.mode
|
||||
assert "b" in fake_mp3.track.mode
|
||||
|
||||
|
||||
def test_playlist_seq(micropythonify, faketimermanager, monkeypatch):
|
||||
fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3'])
|
||||
fake_mp3 = FakeMp3Player()
|
||||
deps = app.Dependencies(mp3player=lambda _: fake_mp3,
|
||||
nfcreader=lambda _: FakeNfcReader(),
|
||||
buttons=lambda _: FakeButtons(),
|
||||
playlistdb=lambda _: fake_db)
|
||||
dut = app.PlayerApp(deps)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(builtins, 'open', fake_open)
|
||||
dut.onTagChange([23, 42, 1, 2, 3])
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track1.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track2.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is not None
|
||||
assert fake_mp3.track.filename == b'track3.mp3'
|
||||
|
||||
fake_mp3.track = None
|
||||
dut.onPlaybackDone()
|
||||
assert fake_mp3.track is None
|
||||
99
software/tests/utils_test/test_btreedb.py
Normal file
99
software/tests/utils_test/test_btreedb.py
Normal file
@@ -0,0 +1,99 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from utils import BTreeDB
|
||||
|
||||
|
||||
class FakeDB:
|
||||
def __init__(self, contents):
|
||||
self.contents = contents
|
||||
self.saved_contents = dict(contents)
|
||||
|
||||
def flush(self):
|
||||
self.saved_contents = dict(self.contents)
|
||||
|
||||
def values(self, start_key, end_key=None, flags=None):
|
||||
res = []
|
||||
for key in sorted(self.contents):
|
||||
if start_key >= key:
|
||||
continue
|
||||
if end_key is not None and end_key < key:
|
||||
break
|
||||
yield self.contents[key]
|
||||
res.append(self.contents[key])
|
||||
|
||||
def keys(self, start_key, end_key=None, flags=None):
|
||||
for key in sorted(self.contents):
|
||||
if start_key >= key:
|
||||
continue
|
||||
if end_key is not None and end_key < key:
|
||||
break
|
||||
yield key
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.contents.get(key, default)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.contents[key]
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
self.contents[key] = val
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.contents[key]
|
||||
|
||||
|
||||
def test_playlist_load():
|
||||
contents = {b'foo/part': b'no',
|
||||
b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
}
|
||||
uut = BTreeDB(FakeDB(contents))
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert list(pl.getPaths()) == [b'track1', b'track2']
|
||||
assert pl.getCurrentPath() == b'track1'
|
||||
|
||||
|
||||
def test_playlist_nextpath():
|
||||
contents = FakeDB({b'foo/part': b'no',
|
||||
b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlisttt': b'no'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() == b'track2'
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'1'
|
||||
|
||||
|
||||
def test_playlist_nextpath_last():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
pl = uut.getPlaylistForTag(b'foo')
|
||||
assert pl.getNextPath() is None
|
||||
assert contents.saved_contents[b'foo/playlistpos'] == b'0'
|
||||
|
||||
|
||||
def test_playlist_create():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
newplaylist = [b'never gonna give you up.mp3', b'durch den monsun.mp3']
|
||||
uut = BTreeDB(contents)
|
||||
new_pl = uut.createPlaylistForTag(b'foo', newplaylist)
|
||||
assert list(new_pl.getPaths()) == newplaylist
|
||||
assert new_pl.getCurrentPath() == newplaylist[0]
|
||||
|
||||
|
||||
def test_playlist_load_notexist():
|
||||
contents = FakeDB({b'foo/playlist/0': b'track1',
|
||||
b'foo/playlist/1': b'track2',
|
||||
b'foo/playlistpos': b'1'
|
||||
})
|
||||
uut = BTreeDB(contents)
|
||||
assert uut.getPlaylistForTag(b'notfound') is None
|
||||
Reference in New Issue
Block a user