diff --git a/software/src/app.py b/software/src/app.py index 95007b7..01905cc 100644 --- a/software/src/app.py +++ b/software/src/app.py @@ -6,7 +6,7 @@ import time from utils import TimerManager -Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb')) +Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons', 'playlistdb', 'leds')) # Should be ~ 6dB steps VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251] @@ -44,6 +44,7 @@ class PlayerApp: self.player = deps.mp3player(self) self.nfc = deps.nfcreader(self.tag_state_machine) self.playlist_db = deps.playlistdb(self) + self.leds = deps.leds(self) self.tag_mode = self.playlist_db.getSetting('tagmode') self.playing_tag = None self.playlist = None @@ -51,6 +52,7 @@ class PlayerApp: self.mp3file = None self.volume_pos = 3 self.player.set_volume(VOLUME_CURVE[self.volume_pos]) + self._onIdle() def __del__(self): if self.mp3file is not None: @@ -96,7 +98,10 @@ class PlayerApp: self._play_next() def _set_playlist(self, tag: bytes): - self._unset_playlist() + if self.playlist is not None: + pos = self.player.stop() + if pos is not None: + self.playlist.setPlaybackOffset(pos) self.playlist = self.playlist_db.getPlaylistForTag(tag) self._play(self.playlist.getCurrentPath() if self.playlist is not None else None, self.playlist.getPlaybackOffset() if self.playlist is not None else 0) @@ -104,6 +109,7 @@ class PlayerApp: def _unset_playlist(self): if self.playlist is not None: pos = self.player.stop() + self._onIdle() if pos is not None: self.playlist.setPlaybackOffset(pos) self.playlist = None @@ -122,7 +128,15 @@ class PlayerApp: self.player.stop() self.mp3file.close() self.mp3file = None + self._onIdle() if filename is not None: print(f'Playing {filename!r}') self.mp3file = open(filename, 'rb') self.player.play(self.mp3file, offset) + self._onActive() + + def _onIdle(self): + self.leds.set_state(self.leds.IDLE) + + def _onActive(self): + self.leds.set_state(self.leds.PLAYING) diff --git a/software/src/main.py b/software/src/main.py index 3abfb06..27658c0 100644 --- a/software/src/main.py +++ b/software/src/main.py @@ -9,7 +9,6 @@ import micropython import network import os import time -from math import pi, sin, pow # Own modules import app @@ -18,7 +17,7 @@ from mfrc522 import MFRC522 from mp3player import MP3Player from nfc import Nfc from rp2_neopixel import NeoPixel -from utils import BTreeFileManager, Buttons, SDContext, TimerManager +from utils import BTreeFileManager, Buttons, SDContext, TimerManager, LedManager try: import hwconfig @@ -31,28 +30,6 @@ micropython.alloc_emergency_exception_buf(100) # Machine setup hwconfig.board_init() - -async def rainbow(np, period=10): - def gamma(value, X=2.2): - return min(max(int(brightness * pow(value / 255.0, X) * 255.0 + 0.5), 0), 255) - - brightness = 0.05 - count = 0.0 - leds = len(np) - while True: - for i in range(leds): - ofs = (count + i) % leds - np[i] = (gamma((sin(ofs / leds * 2 * pi) + 1) * 127), - gamma((sin(ofs / leds * 2 * pi + 2/3*pi) + 1) * 127), - gamma((sin(ofs / leds * 2 * pi + 4/3*pi) + 1) * 127)) - count += 0.02 * leds - before = time.ticks_ms() - await np.async_write() - now = time.ticks_ms() - if before + 20 > now: - await asyncio.sleep_ms(20 - (now - before)) - - # high prio for proc 1 machine.mem32[0x40030000 + 0x00] = 0x10 @@ -71,7 +48,6 @@ def run(): asyncio.new_event_loop() # Setup LEDs np = NeoPixel(hwconfig.LED_DIN, hwconfig.LED_COUNT, sm=1) - asyncio.create_task(rainbow(np)) # Wifi with default config setup_wifi() @@ -101,7 +77,8 @@ def run(): buttons=lambda the_app: Buttons(the_app, pin_volup=hwconfig.BUTTON_VOLUP, pin_voldown=hwconfig.BUTTON_VOLDOWN, pin_next=hwconfig.BUTTON_NEXT), - playlistdb=lambda _: playlistdb) + playlistdb=lambda _: playlistdb, + leds=lambda _: LedManager(np)) the_app = app.PlayerApp(deps) # Start diff --git a/software/src/utils/__init__.py b/software/src/utils/__init__.py index aa24852..abf0eae 100644 --- a/software/src/utils/__init__.py +++ b/software/src/utils/__init__.py @@ -2,10 +2,12 @@ # Copyright (c) 2025 Matthias Blankertz from utils.buttons import Buttons +from utils.leds import LedManager from utils.mbrpartition import MBRPartition from utils.pinindex import get_pin_index from utils.playlistdb import BTreeDB, BTreeFileManager from utils.sdcontext import SDContext from utils.timer import TimerManager -__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "get_pin_index", "MBRPartition", "SDContext", "TimerManager"] +__all__ = ["BTreeDB", "BTreeFileManager", "Buttons", "get_pin_index", "LedManager", "MBRPartition", "SDContext", + "TimerManager"] diff --git a/software/src/utils/leds.py b/software/src/utils/leds.py new file mode 100644 index 0000000..3598c6d --- /dev/null +++ b/software/src/utils/leds.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2025 Matthias Blankertz + +import asyncio +from math import sin, pi +from micropython import const +import time + + +class LedManager: + IDLE = const(0) + PLAYING = const(1) + + def __init__(self, np): + self.led_state = LedManager.IDLE + self.np = np + self.brightness = 0.1 + self.leds = len(self.np) + asyncio.create_task(self.run()) + + def set_state(self, state): + assert state in [LedManager.IDLE, LedManager.PLAYING] + self.led_state = state + + def _gamma(self, value, X=2.2): + result = min(max(int(self.brightness * pow(value / 255.0, X) * 255.0 + 0.5), 0), 255) + if value > 0: + result = max(1, result) + return result + + def _rainbow(self, time): + for i in range(self.leds): + ofs = (time * self.leds + i) % self.leds + self.np[i] = (self._gamma((sin(ofs / self.leds * 2 * pi) + 1) * 127), + self._gamma((sin(ofs / self.leds * 2 * pi + 2/3*pi) + 1) * 127), + self._gamma((sin(ofs / self.leds * 2 * pi + 4/3*pi) + 1) * 127)) + + def _pulse(self, time, color, speed): + scaled_sin = max(1, abs(sin(time / speed * 2 * pi)) * 255) + val = (self._gamma(color[0]*scaled_sin), + self._gamma(color[1]*scaled_sin), + self._gamma(color[2]*scaled_sin)) + for i in range(self.leds): + self.np[i] = val + + async def run(self): + time_ = 0.0 + while True: + if self.led_state == LedManager.IDLE: + self._pulse(time_, (0, 1, 0), 3) + elif self.led_state == LedManager.PLAYING: + self._rainbow(time_) + time_ += 0.02 + before = time.ticks_ms() + await self.np.async_write() + now = time.ticks_ms() + if before + 20 > now: + await asyncio.sleep_ms(20 - (now - before)) diff --git a/software/tests/mocks/micropython.py b/software/tests/mocks/micropython.py index 7c14b67..cc7bdeb 100644 --- a/software/tests/mocks/micropython.py +++ b/software/tests/mocks/micropython.py @@ -1,2 +1,5 @@ # SPDX-License-Identifier: MIT # Copyright (c) 2025 Matthias Blankertz + +def const(x): + return x diff --git a/software/tests/test_playerapp.py b/software/tests/test_playerapp.py index b0a07f7..efa2dfb 100644 --- a/software/tests/test_playerapp.py +++ b/software/tests/test_playerapp.py @@ -43,11 +43,21 @@ class FakeMp3Player: class FakeTimerManager: - def __init__(self): pass - def cancel(self, timer): pass + def __init__(self): + self.queued = [] + + def cancel(self, timer): + self.queued = [(elem[0], elem[1], True) if elem[1] == timer else elem for elem in self.queued] def schedule(self, when, what): - what() + self.queued.append((when, what, False)) + + def testing_run_queued(self): + queued = self.queued + self.queued = [] + for when, what, canceled in queued: + if not canceled: + what() class FakeNfcReader: @@ -91,32 +101,49 @@ class FakePlaylistDb: return None +class FakeLeds: + IDLE = 0 + PLAYING = 1 + + def __init__(self): + self.state = None + + def set_state(self, state): + self.state = state + + def fake_open(filename, mode): return FakeFile(filename, mode) @pytest.fixture def faketimermanager(monkeypatch): - monkeypatch.setattr(utils.timer.TimerManager, '_instance', FakeTimerManager()) + fake_timer_manager = FakeTimerManager() + monkeypatch.setattr(utils.timer.TimerManager, '_instance', fake_timer_manager) + yield fake_timer_manager + + +def _makedeps(mp3player=FakeMp3Player, nfcreader=FakeNfcReader, buttons=FakeButtons, + playlistdb=FakePlaylistDb, leds=FakeLeds): + return app.Dependencies(mp3player=lambda _: mp3player() if callable(mp3player) else mp3player, + nfcreader=lambda x: nfcreader(x) if callable(nfcreader) else nfcreader, + buttons=lambda _: buttons() if callable(buttons) else buttons, + playlistdb=lambda _: playlistdb() if callable(playlistdb) else playlistdb, + leds=lambda _: leds() if callable(leds) else leds) def test_construct_app(micropythonify, faketimermanager): fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda _: FakeNfcReader(), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: FakePlaylistDb()) - _ = app.PlayerApp(deps) + deps = _makedeps(mp3player=fake_mp3) + dut = app.PlayerApp(deps) + fake_mp3 = dut.player assert fake_mp3.volume is not None def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch): fake_db = FakePlaylistDb() fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda x: FakeNfcReader(x), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: fake_db) + deps = _makedeps(mp3player=fake_mp3, playlistdb=fake_db) app.PlayerApp(deps) with monkeypatch.context() as m: m.setattr(builtins, 'open', fake_open) @@ -130,10 +157,7 @@ def test_load_playlist_on_tag(micropythonify, faketimermanager, monkeypatch): def test_playlist_seq(micropythonify, faketimermanager, monkeypatch): fake_db = FakePlaylistDb([b'track1.mp3', b'track2.mp3', b'track3.mp3']) fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda x: FakeNfcReader(x), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: fake_db) + deps = _makedeps(mp3player=fake_mp3, playlistdb=fake_db) dut = app.PlayerApp(deps) with monkeypatch.context() as m: m.setattr(builtins, 'open', fake_open) @@ -166,10 +190,7 @@ def test_playlist_unknown_tag(micropythonify, faketimermanager, monkeypatch): fake_db = FakeNoPlaylistDb() fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda x: FakeNfcReader(x), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: fake_db) + deps = _makedeps(mp3player=fake_mp3, playlistdb=fake_db) app.PlayerApp(deps) with monkeypatch.context() as m: m.setattr(builtins, 'open', fake_open) @@ -189,10 +210,7 @@ def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch): fake_db = MyFakePlaylistDb() fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda x: FakeNfcReader(x), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: fake_db) + deps = _makedeps(mp3player=fake_mp3, playlistdb=fake_db) app.PlayerApp(deps) with monkeypatch.context() as m: m.setattr(builtins, 'open', fake_open) @@ -202,6 +220,7 @@ def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch): assert fake_mp3.track.filename == b'test/path.mp3' # Removing tag should not stop playback FakeNfcReader.tag_callback.onTagChange(None) + faketimermanager.testing_run_queued() assert fake_mp3.track is not None assert fake_mp3.track.filename == b'test/path.mp3' # Presenting tag should stop playback @@ -209,6 +228,7 @@ def test_tagmode_startstop(micropythonify, faketimermanager, monkeypatch): assert fake_mp3.track is None # Nothing should change here FakeNfcReader.tag_callback.onTagChange(None) + faketimermanager.testing_run_queued() assert fake_mp3.track is None # Presenting tag again should start playback again FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3]) @@ -228,10 +248,7 @@ def test_tagmode_remains(micropythonify, faketimermanager, monkeypatch): fake_db = MyFakePlaylistDb() fake_mp3 = FakeMp3Player() - deps = app.Dependencies(mp3player=lambda _: fake_mp3, - nfcreader=lambda x: FakeNfcReader(x), - buttons=lambda _: FakeButtons(), - playlistdb=lambda _: fake_db) + deps = _makedeps(mp3player=fake_mp3, playlistdb=fake_db) app.PlayerApp(deps) with monkeypatch.context() as m: m.setattr(builtins, 'open', fake_open) @@ -241,8 +258,23 @@ def test_tagmode_remains(micropythonify, faketimermanager, monkeypatch): assert fake_mp3.track.filename == b'test/path.mp3' # Remove tag to stop playback FakeNfcReader.tag_callback.onTagChange(None) + faketimermanager.testing_run_queued() assert fake_mp3.track is None # Presenting tag again should start playback again FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3]) assert fake_mp3.track is not None assert fake_mp3.track.filename == b'test/path.mp3' + + +def test_led_state(micropythonify, faketimermanager, monkeypatch): + fake_leds = FakeLeds() + deps = _makedeps(leds=fake_leds) + app.PlayerApp(deps) + assert fake_leds.state == FakeLeds.IDLE + with monkeypatch.context() as m: + m.setattr(builtins, 'open', fake_open) + FakeNfcReader.tag_callback.onTagChange([23, 42, 1, 2, 3]) + assert fake_leds.state == FakeLeds.PLAYING + FakeNfcReader.tag_callback.onTagChange(None) + faketimermanager.testing_run_queued() + assert fake_leds.state == FakeLeds.IDLE