playlistdb: Allow up to 100k tracks; Add validate method; docstrings

- Increase the formatting of playlist entries to allow up to 100000
  tracks. Also enforce that playlist entries are indexed by integers
  using the validate method.

- Add a validate method to validate the data stored in the
  btreedb. Optionally dump the contents to stdout. For testing, add a
  validate+dump by default when opening the db. This can be removed once
  the playlistdb is validated.
This commit is contained in:
2025-08-28 12:14:07 +02:00
parent 69e119a8a0
commit 16d5180d34
3 changed files with 71 additions and 6 deletions

View File

@@ -86,6 +86,10 @@ def run():
def builddb():
"""
For testing, build a playlist db based on the previous tag directory format.
Can be removed once uploading files / playlist via the web api is possible.
"""
import os
os.unlink('/sd/tonberry.db')

View File

@@ -65,7 +65,7 @@ class BTreeDB(IPlaylistDB):
@staticmethod
def _keyPlaylistEntry(tag, pos):
return b''.join([tag, b'/playlist/', "{:03}".format(pos).encode()])
return b''.join([tag, b'/playlist/', '{:05}'.format(pos).encode()])
@staticmethod
def _keyPlaylistStart(tag):
@@ -151,6 +151,64 @@ class BTreeDB(IPlaylistDB):
self._savePlaylist(tag, entries)
return self.getPlaylistForTag(tag)
def validate(self, dump=False):
"""
Validate the structure of the playlist database.
"""
result = True
last_tag = None
last_pos = None
index_width = None
for k in self.db.keys():
fields = k.split(b'/')
if len(fields) <= 1:
print(f'Malformed key {k!r}')
result = False
if last_tag != fields[0]:
last_tag = fields[0]
last_pos = None
if dump:
print(f'Tag {fields[0]}')
if fields[1] == b'playlist':
if len(fields) != 3:
print(f'Malformed playlist entry: {k!r}')
result = False
continue
try:
idx = int(fields[2])
except ValueError:
print(f'Malformed playlist entry: {k!r}')
result = False
continue
if index_width is not None and len(fields[2]) != index_width:
print(f'Inconsistent index width for {last_tag} at {idx}')
result = False
if (last_pos is not None and last_pos + 1 != idx) or \
(last_pos is None and idx != 0):
print(f'Bad playlist entry sequence for {last_tag} at {idx}')
result = False
last_pos = idx
index_width = len(fields[2])
if dump:
print(f'\tTrack {idx}: {self.db[k]!r}')
elif fields[1] == b'playlistpos':
val = self.db[k]
try:
idx = int(val)
except ValueError:
print(f'Malformed playlist position: {val!r}')
result = False
continue
if 0 > idx or idx > last_pos:
print(f'Playlist position out of range for {last_tag}: {idx}')
result = False
if dump:
print(f'\tPosition {idx}')
else:
print(f'Unknown key {k!r}')
result = False
return result
class BTreeFileManager:
"""
@@ -166,7 +224,9 @@ class BTreeFileManager:
self.db_file = open(self.db_path, 'w+b')
try:
self.db = btree.open(self.db_file, pagesize=512, cachesize=1024)
return BTreeDB(self.db, lambda: self.db_file.flush())
btdb = BTreeDB(self.db, lambda: self.db_file.flush())
btdb.validate(True) # while testing, validate and dump DB on startup
return btdb
except Exception:
self.db_file.close()
raise

View File

@@ -12,19 +12,19 @@ class FakeDB:
def flush(self):
self.saved_contents = dict(self.contents)
def values(self, start_key, end_key=None, flags=None):
def values(self, start_key=None, end_key=None, flags=None):
res = []
for key in sorted(self.contents):
if start_key > key:
if start_key is not None and start_key > key:
continue
if end_key is not None and end_key < key:
break
yield self.contents[key]
res.append(self.contents[key])
def keys(self, start_key, end_key=None, flags=None):
def keys(self, start_key=None, end_key=None, flags=None):
for key in sorted(self.contents):
if start_key > key:
if start_key is not None and start_key > key:
continue
if end_key is not None and end_key < key:
break
@@ -88,6 +88,7 @@ def test_playlist_create():
new_pl = uut.createPlaylistForTag(b'foo', newplaylist)
assert list(new_pl.getPaths()) == newplaylist
assert new_pl.getCurrentPath() == newplaylist[0]
assert uut.validate(True)
def test_playlist_load_notexist():