Merge pull request '27-configuration-storage' (#53) from 27-configuration-storage into main
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 4m42s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 10s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 6s
Run unit tests on host / Run-Unit-Tests (push) Successful in 9s
Run pytests / Check-Pytest (push) Successful in 12s

Reviewed-on: #53
Reviewed-by: Stefan Kratochwil <kratochwil-la@gmx.de>
This commit was merged in pull request #53.
This commit is contained in:
2025-12-03 19:20:26 +00:00
11 changed files with 236 additions and 43 deletions

View File

@@ -6,7 +6,8 @@ import time
from utils import TimerManager from utils import TimerManager
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb', 'hwconfig', 'leds')) Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb', 'hwconfig', 'leds',
'config'))
# Should be ~ 6dB steps # Should be ~ 6dB steps
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251] VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
@@ -14,11 +15,12 @@ VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
class PlayerApp: class PlayerApp:
class TagStateMachine: class TagStateMachine:
def __init__(self, parent, timer_manager): def __init__(self, parent, timer_manager, timeout=5000):
self.parent = parent self.parent = parent
self.timer_manager = timer_manager self.timer_manager = timer_manager
self.current_tag = None self.current_tag = None
self.current_tag_time = time.ticks_ms() self.current_tag_time = time.ticks_ms()
self.timeout = timeout
def onTagChange(self, new_tag): def onTagChange(self, new_tag):
if new_tag is not None: if new_tag is not None:
@@ -31,7 +33,7 @@ class PlayerApp:
self.current_tag = new_tag self.current_tag = new_tag
self.parent.onNewTag(new_tag) self.parent.onNewTag(new_tag)
else: else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay) self.timer_manager.schedule(time.ticks_ms() + self.timeout, self.onTagRemoveDelay)
def onTagRemoveDelay(self): def onTagRemoveDelay(self):
if self.current_tag is not None: if self.current_tag is not None:
@@ -40,7 +42,10 @@ class PlayerApp:
def __init__(self, deps: Dependencies): def __init__(self, deps: Dependencies):
self.timer_manager = TimerManager() self.timer_manager = TimerManager()
self.tag_state_machine = self.TagStateMachine(self, self.timer_manager) self.config = deps.config(self)
self.tag_timeout_ms = self.config.get_tag_timeout() * 1000
self.idle_timeout_ms = self.config.get_idle_timeout() * 1000
self.tag_state_machine = self.TagStateMachine(self, self.timer_manager, self.tag_timeout_ms)
self.player = deps.mp3player(self) self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self.tag_state_machine) self.nfc = deps.nfcreader(self.tag_state_machine)
self.playlist_db = deps.playlistdb(self) self.playlist_db = deps.playlistdb(self)
@@ -52,6 +57,7 @@ class PlayerApp:
self.buttons = deps.buttons(self) if deps.buttons is not None else None self.buttons = deps.buttons(self) if deps.buttons is not None else None
self.mp3file = None self.mp3file = None
self.volume_pos = 3 self.volume_pos = 3
self.paused = False
self.player.set_volume(VOLUME_CURVE[self.volume_pos]) self.player.set_volume(VOLUME_CURVE[self.volume_pos])
self._onIdle() self._onIdle()
@@ -91,6 +97,10 @@ class PlayerApp:
self.player.set_volume(VOLUME_CURVE[self.volume_pos]) self.player.set_volume(VOLUME_CURVE[self.volume_pos])
elif what == self.buttons.NEXT: elif what == self.buttons.NEXT:
self._play_next() self._play_next()
elif what == self.buttons.PREV:
self._play_prev()
elif what == self.buttons.PLAY_PAUSE:
self._pause_toggle()
def onPlaybackDone(self): def onPlaybackDone(self):
assert self.mp3file is not None assert self.mp3file is not None
@@ -103,7 +113,7 @@ class PlayerApp:
self.hwconfig.power_off() self.hwconfig.power_off()
else: else:
# Check again in a minute # Check again in a minute
self.timer_manager.schedule(time.ticks_ms() + 60*1000, self.onIdleTimeout) self.timer_manager.schedule(time.ticks_ms() + self.idle_timeout_ms, self.onIdleTimeout)
def _set_playlist(self, tag: bytes): def _set_playlist(self, tag: bytes):
if self.playlist is not None: if self.playlist is not None:
@@ -131,6 +141,15 @@ class PlayerApp:
self.playlist = None self.playlist = None
self.playing_tag = None self.playing_tag = None
def _play_prev(self):
if self.playlist is None:
return
filename = self.playlist.getPrevPath()
self._play(filename)
if filename is None:
self.playlist = None
self.playing_tag = None
def _play(self, filename: bytes | None, offset=0): def _play(self, filename: bytes | None, offset=0):
if self.mp3file is not None: if self.mp3file is not None:
self.player.stop() self.player.stop()
@@ -141,10 +160,21 @@ class PlayerApp:
print(f'Playing {filename!r}') print(f'Playing {filename!r}')
self.mp3file = open(filename, 'rb') self.mp3file = open(filename, 'rb')
self.player.play(self.mp3file, offset) self.player.play(self.mp3file, offset)
self.paused = False
self._onActive() self._onActive()
def _pause_toggle(self):
if self.playlist is None:
return
if self.paused:
self._play(self.playlist.getCurrentPath(), self.pause_offset)
else:
self.pause_offset = self.player.stop()
self.paused = True
self._onIdle()
def _onIdle(self): def _onIdle(self):
self.timer_manager.schedule(time.ticks_ms() + 60*1000, self.onIdleTimeout) self.timer_manager.schedule(time.ticks_ms() + self.idle_timeout_ms, self.onIdleTimeout)
self.leds.set_state(self.leds.IDLE) self.leds.set_state(self.leds.IDLE)
def _onActive(self): def _onActive(self):

View File

@@ -28,13 +28,14 @@ RC522_SS = Pin.board.GP13
# WS2812 # WS2812
LED_DIN = Pin.board.GP16 LED_DIN = Pin.board.GP16
LED_COUNT = 1
# Buttons # Buttons
BUTTON_VOLUP = Pin.board.GP17 BUTTONS = [Pin.board.GP17,
BUTTON_VOLDOWN = Pin.board.GP19 Pin.board.GP18,
BUTTON_NEXT = Pin.board.GP18 Pin.board.GP19,
BUTTON_POWER = Pin.board.GP21 Pin.board.GP20,
Pin.board.GP21,
]
# Power # Power
POWER_EN = Pin.board.GP22 POWER_EN = Pin.board.GP22

View File

@@ -27,13 +27,12 @@ RC522_SS = Pin.board.GP13
# WS2812 # WS2812
LED_DIN = Pin.board.GP16 LED_DIN = Pin.board.GP16
LED_COUNT = 1
# Buttons # Buttons
BUTTON_VOLUP = Pin.board.GP17 BUTTONS = [Pin.board.GP17,
BUTTON_VOLDOWN = Pin.board.GP19 Pin.board.GP18,
BUTTON_NEXT = Pin.board.GP18 Pin.board.GP19,
BUTTON_POWER = None ]
# Power # Power
POWER_EN = None POWER_EN = None

View File

@@ -18,7 +18,7 @@ from mfrc522 import MFRC522
from mp3player import MP3Player from mp3player import MP3Player
from nfc import Nfc from nfc import Nfc
from rp2_neopixel import NeoPixel from rp2_neopixel import NeoPixel
from utils import BTreeFileManager, Buttons, SDContext, TimerManager, LedManager from utils import BTreeFileManager, Buttons, SDContext, TimerManager, LedManager, Configuration
from webserver import start_webserver from webserver import start_webserver
try: try:
@@ -55,15 +55,17 @@ def setup_wifi():
DB_PATH = '/sd/tonberry.db' DB_PATH = '/sd/tonberry.db'
config = Configuration()
def run(): def run():
asyncio.new_event_loop() asyncio.new_event_loop()
# Setup LEDs # Setup LEDs
np = NeoPixel(hwconfig.LED_DIN, hwconfig.LED_COUNT, sm=1) np = NeoPixel(hwconfig.LED_DIN, config.get_led_count(), sm=1)
# Wifi with default config # Wifi with default config
setup_wifi() setup_wifi()
start_webserver() start_webserver(config)
# Setup MP3 player # Setup MP3 player
with SDContext(mosi=hwconfig.SD_DI, miso=hwconfig.SD_DO, sck=hwconfig.SD_SCK, ss=hwconfig.SD_CS, with SDContext(mosi=hwconfig.SD_DI, miso=hwconfig.SD_DO, sck=hwconfig.SD_SCK, ss=hwconfig.SD_CS,
@@ -87,12 +89,11 @@ def run():
# Setup app # Setup app
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app), deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
nfcreader=lambda the_app: Nfc(reader, the_app), nfcreader=lambda the_app: Nfc(reader, the_app),
buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP, buttons=lambda the_app: Buttons(the_app, config, hwconfig),
pin_voldown=hwconfig.BUTTON_VOLDOWN,
pin_next=hwconfig.BUTTON_NEXT),
playlistdb=lambda _: playlistdb, playlistdb=lambda _: playlistdb,
hwconfig=lambda _: hwconfig, hwconfig=lambda _: hwconfig,
leds=lambda _: LedManager(np)) leds=lambda _: LedManager(np),
config=lambda _: config)
the_app = app.PlayerApp(deps) the_app = app.PlayerApp(deps)
# Start # Start
@@ -121,5 +122,5 @@ def builddb():
if __name__ == '__main__': if __name__ == '__main__':
time.sleep(1) time.sleep(1)
if machine.Pin(hwconfig.BUTTON_VOLUP, machine.Pin.IN, machine.Pin.PULL_UP).value() != 0: if machine.Pin(hwconfig.BUTTONS[0], machine.Pin.IN, machine.Pin.PULL_UP).value() != 0:
run() run()

View File

@@ -2,6 +2,7 @@
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org> # Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from utils.buttons import Buttons from utils.buttons import Buttons
from utils.config import Configuration
from utils.leds import LedManager from utils.leds import LedManager
from utils.mbrpartition import MBRPartition from utils.mbrpartition import MBRPartition
from utils.pinindex import get_pin_index from utils.pinindex import get_pin_index
@@ -9,5 +10,5 @@ from utils.playlistdb import BTreeDB, BTreeFileManager
from utils.sdcontext import SDContext from utils.sdcontext import SDContext
from utils.timer import TimerManager from utils.timer import TimerManager
__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "get_pin_index", "LedManager", "MBRPartition", "SDContext", __all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "Configuration", "get_pin_index", "LedManager", "MBRPartition",
"TimerManager"] "SDContext", "TimerManager"]

View File

@@ -17,14 +17,27 @@ if TYPE_CHECKING:
class Buttons: class Buttons:
def __init__(self, cb: "ButtonCallback", pin_volup=17, pin_voldown=19, pin_next=18): VOLUP = micropython.const(1)
self.VOLUP = micropython.const(1) VOLDOWN = micropython.const(2)
self.VOLDOWN = micropython.const(2) NEXT = micropython.const(3)
self.NEXT = micropython.const(3) PREV = micropython.const(4)
PLAY_PAUSE = micropython.const(5)
KEYMAP = {VOLUP: 'VOLUP',
VOLDOWN: 'VOLDOWN',
NEXT: 'NEXT',
PREV: 'PREV',
PLAY_PAUSE: 'PLAY_PAUSE'}
def __init__(self, cb: "ButtonCallback", config, hwconfig):
self.button_map = config.get_button_map()
self.hw_buttons = hwconfig.BUTTONS
self.cb = cb self.cb = cb
self.buttons = {machine.Pin(pin_volup, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLUP, self.buttons = dict()
machine.Pin(pin_voldown, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLDOWN, for key_id, key_name in self.KEYMAP.items():
machine.Pin(pin_next, machine.Pin.IN, machine.Pin.PULL_UP): self.NEXT} pin = self._get_pin(key_name)
if pin is None:
continue
self.buttons[pin] = key_id
self.int_flag = asyncio.ThreadSafeFlag() self.int_flag = asyncio.ThreadSafeFlag()
self.pressed: list[int] = [] self.pressed: list[int] = []
self.last: dict[int, int] = {} self.last: dict[int, int] = {}
@@ -32,6 +45,17 @@ class Buttons:
button.irq(handler=self._interrupt, trigger=machine.Pin.IRQ_FALLING | machine.Pin.IRQ_RISING) button.irq(handler=self._interrupt, trigger=machine.Pin.IRQ_FALLING | machine.Pin.IRQ_RISING)
asyncio.create_task(self.task()) asyncio.create_task(self.task())
def _get_pin(self, key):
key_id = self.button_map.get(key, None)
if key_id is None:
return None
if key_id < 0 or key_id >= len(self.hw_buttons):
return None
pin = self.hw_buttons[key_id]
if pin is not None:
pin.init(machine.Pin.IN, machine.Pin.PULL_UP)
return pin
def _interrupt(self, button): def _interrupt(self, button):
keycode = self.buttons[button] keycode = self.buttons[button]
last = self.last.get(keycode, 0) last = self.last.get(keycode, 0)

View File

@@ -0,0 +1,91 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from errno import ENOENT
import json
import os
try:
from typing import TYPE_CHECKING, Mapping, Any
except ImportError:
TYPE_CHECKING = False
class Configuration:
DEFAULT_CONFIG = {
'LED_COUNT': 1,
'IDLE_TIMEOUT_SECS': 60,
'TAG_TIMEOUT_SECS': 5,
'BUTTON_MAP': {
'PLAY_PAUSE': 4,
'VOLUP': 0,
'VOLDOWN': 2,
'PREV': None,
'NEXT': 1,
}
}
def __init__(self, config_path='/config.json'):
self.config_path = config_path
try:
with open(self.config_path, 'r') as conf_file:
self.config = json.load(conf_file)
except OSError as ex:
if ex.errno == ENOENT:
self.config = Configuration.DEFAULT_CONFIG
self._save()
else:
raise
except ValueError as ex:
print(f"Warning: Could not load configuration {self.config_path}:\n{ex}")
self._move_config_to_backup()
self.config = Configuration.DEFAULT_CONFIG
def _move_config_to_backup(self):
# Remove old backup
try:
os.remove(self.config_path + '.bup')
os.rename(self.config_path, self.config_path + '.bup')
except OSError as ex:
if ex.errno != ENOENT:
raise
os.sync()
def _save(self):
with open(self.config_path + '.new', 'w') as conf_file:
json.dump(self.config, conf_file)
self._move_config_to_backup()
os.rename(self.config_path + '.new', self.config_path)
os.sync()
def _get(self, key):
return self.config.get(key, self.DEFAULT_CONFIG[key])
def get_led_count(self) -> int:
return self._get('LED_COUNT')
def get_idle_timeout(self) -> int:
return self._get('IDLE_TIMEOUT_SECS')
def get_tag_timeout(self) -> int:
return self._get('TAG_TIMEOUT_SECS')
def get_button_map(self) -> Mapping[str, int | None]:
return self._get('BUTTON_MAP')
# For the web API
def get_config(self) -> Mapping[str, Any]:
return self.config
def _validate(self, default, config, path=''):
for k in config.keys():
if k not in default:
raise ValueError(f'Invalid config key {path}/{k}')
if isinstance(default[k], dict):
if not isinstance(config[k], dict):
raise ValueError(f'Invalid config: Value of {path}/{k} must be mapping')
self._validate(default[k], config[k], f'{path}/{k}')
def set_config(self, config):
self._validate(self.DEFAULT_CONFIG, config)
self.config = config
self._save()

View File

@@ -107,6 +107,17 @@ class BTreeDB(IPlaylistDB):
self.setPlaybackOffset(0) self.setPlaybackOffset(0)
return self.getCurrentPath() return self.getCurrentPath()
def getPrevPath(self):
"""
Select prev track and return path.
"""
if self.pos > 0:
self.pos -= 1
if self.persist != BTreeDB.PERSIST_NO:
self.parent._setPlaylistPos(self.tag, self.pos)
self.setPlaybackOffset(0)
return self.getCurrentPath()
def setPlaybackOffset(self, offset): def setPlaybackOffset(self, offset):
""" """
Store the current position in the track for PERSIST_OFFSET mode Store the current position in the track for PERSIST_OFFSET mode

View File

@@ -22,6 +22,7 @@ class TimerManager(object):
def schedule(self, when, what): def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
self._remove_timer(what) # Ensure timer is not already scheduled
heapq.heappush(self.timers, (when, what)) heapq.heappush(self.timers, (when, what))
if cur_nearest is None or cur_nearest > self.timers[0][0]: if cur_nearest is None or cur_nearest > self.timers[0][0]:
# New timer is closer than previous closest timer # New timer is closer than previous closest timer
@@ -31,18 +32,22 @@ class TimerManager(object):
self.worker_event.set() self.worker_event.set()
def cancel(self, what): def cancel(self, what):
remove_idx = self._remove_timer(what)
if remove_idx == 0:
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
def _remove_timer(self, what):
try: try:
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers))))) (when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
except StopIteration: except StopIteration:
return False return False
del self.timers[i] del self.timers[i]
heapq.heapify(self.timers) heapq.heapify(self.timers)
if i == 0: return i
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
async def _timer_worker(self): async def _timer_worker(self):
while True: while True:

View File

@@ -9,11 +9,13 @@ from microdot import Microdot
webapp = Microdot() webapp = Microdot()
server = None server = None
config = None
def start_webserver(): def start_webserver(config_):
global server global server, config
server = asyncio.create_task(webapp.start_server(port=80)) server = asyncio.create_task(webapp.start_server(port=80))
config = config_
@webapp.route('/') @webapp.route('/')
@@ -39,3 +41,17 @@ async def filesystem_post(request):
async def playlist_post(request): async def playlist_post(request):
print(request) print(request)
return {'success': False} return {'success': False}
@webapp.route('/api/v1/config', methods=['GET'])
async def config_get(request):
return config.get_config()
@webapp.route('/api/v1/config', methods=['PUT'])
async def config_put(request):
try:
config.set_config(request.json)
except ValueError as ex:
return str(ex), 400
return '', 204

View File

@@ -124,6 +124,19 @@ class FakeLeds:
self.state = state self.state = state
class FakeConfig:
def __init__(self): pass
def get_led_count(self):
return 1
def get_idle_timeout(self):
return 60
def get_tag_timeout(self):
return 5
def fake_open(filename, mode): def fake_open(filename, mode):
return FakeFile(filename, mode) return FakeFile(filename, mode)
@@ -136,13 +149,14 @@ def faketimermanager(monkeypatch):
def _makedeps(mp3player=FakeMp3Player, nfcreader=FakeNfcReader, buttons=FakeButtons, def _makedeps(mp3player=FakeMp3Player, nfcreader=FakeNfcReader, buttons=FakeButtons,
playlistdb=FakePlaylistDb, hwconfig=FakeHwconfig, leds=FakeLeds): playlistdb=FakePlaylistDb, hwconfig=FakeHwconfig, leds=FakeLeds, config=FakeConfig):
return app.Dependencies(mp3player=lambda _: mp3player() if callable(mp3player) else mp3player, return app.Dependencies(mp3player=lambda _: mp3player() if callable(mp3player) else mp3player,
nfcreader=lambda x: nfcreader(x) if callable(nfcreader) else nfcreader, nfcreader=lambda x: nfcreader(x) if callable(nfcreader) else nfcreader,
buttons=lambda _: buttons() if callable(buttons) else buttons, buttons=lambda _: buttons() if callable(buttons) else buttons,
playlistdb=lambda _: playlistdb() if callable(playlistdb) else playlistdb, playlistdb=lambda _: playlistdb() if callable(playlistdb) else playlistdb,
hwconfig=lambda _: hwconfig() if callable(hwconfig) else hwconfig, hwconfig=lambda _: hwconfig() if callable(hwconfig) else hwconfig,
leds=lambda _: leds() if callable(leds) else leds) leds=lambda _: leds() if callable(leds) else leds,
config=lambda _: config() if callable(config) else config)
def test_construct_app(micropythonify, faketimermanager): def test_construct_app(micropythonify, faketimermanager):