wip: New architecture

Change PlayerApp to new architecture
 - depedencies injected via named tuple
 - some initial type checking
 - move on button press logic to PlayerApp

TODO: Adapt MP3 player
This commit is contained in:
2025-04-29 22:05:58 +02:00
parent b477aba94c
commit 903840f982
6 changed files with 166 additions and 118 deletions

View File

@@ -1,76 +1,22 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import asyncio
import heapq
from collections import namedtuple
import os
import time
class TimerManager:
def __init__(self, timer_debug=False):
self.timers = []
self.timer_debug = timer_debug
self.task = asyncio.create_task(self._timer_worker())
self.worker_event = asyncio.Event()
def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
heapq.heappush(self.timers, (when, what))
if cur_nearest is None or cur_nearest > self.timers[0][0]:
# New timer is closer than previous closest timer
if self.timer_debug:
print(f'cur_nearest: {cur_nearest}, new next: {self.timers[0][0]}')
print("schedule: wake")
self.worker_event.set()
def cancel(self, what):
try:
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
except StopIteration:
return False
del self.timers[i]
heapq.heapify(self.timers)
if i == 0:
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
async def _timer_worker(self):
while True:
if len(self.timers) == 0:
# Nothing to do
await self.worker_event.wait()
if self.timer_debug:
print("_timer_worker: event 0")
self.worker_event.clear()
continue
cur_nearest = self.timers[0][0]
wait_time = cur_nearest - time.ticks_ms()
if wait_time > 0:
if self.timer_debug:
print(f"_timer_worker: next is {self.timers[0]}, sleep {wait_time} ms")
try:
await asyncio.wait_for_ms(self.worker_event.wait(), wait_time)
if self.timer_debug:
print("_timer_worker: event 1")
# got woken up due to event
self.worker_event.clear()
continue
except asyncio.TimeoutError:
pass
_, callback = heapq.heappop(self.timers)
callback()
Dependencies = namedtuple('PlayerAppDependencies', ('mp3player', 'timermanager', 'nfcreader', 'buttons'))
class TagPlaybackManager:
def __init__(self, timer_manager, player):
class PlayerApp:
def __init__(self, deps: Dependencies):
self.current_tag = None
self.current_tag_time = time.ticks_ms()
self.timer_manager = timer_manager
self.player = player
self.timer_manager = deps.timermanager(self)
self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self)
self.buttons = deps.buttons(self) if deps.buttons is not None else None
def onTagChange(self, new_tag):
if new_tag is not None:
@@ -100,3 +46,11 @@ class TagPlaybackManager:
print('Tag gone, stopping playback')
self.current_tag = None
self.player.stop()
def onButtonPressed(self, what):
if what == self.buttons.VOLUP:
self.player.set_volume(min(255, self.player.get_volume()+1))
elif what == self.buttons.VOLDOWN:
self.player.set_volume(max(0, self.player.get_volume()-1))
elif what == self.buttons.NEXT:
self.player.play_next()

View File

@@ -11,13 +11,14 @@ from machine import Pin
from math import pi, sin, pow
# Own modules
from app import TimerManager, TagPlaybackManager
import app
from audiocore import AudioContext
from mfrc522 import MFRC522
from mp3player import MP3Player
from nfc import Nfc
from rp2_neopixel import NeoPixel
from rp2_sd import SDCard
from utils import Buttons, TimerManager
micropython.alloc_emergency_exception_buf(100)
@@ -53,47 +54,6 @@ machine.mem32[0x4001c004 + 8*4] = 0x67
machine.mem32[0x40030000 + 0x00] = 0x10
class Buttons:
def __init__(self, player, pin_volup=17, pin_voldown=19, pin_next=18):
self._VOLUP = micropython.const(1)
self._VOLDOWN = micropython.const(2)
self._NEXT = micropython.const(3)
self.player = player
self.buttons = {machine.Pin(pin_volup, machine.Pin.IN, machine.Pin.PULL_UP): self._VOLUP,
machine.Pin(pin_voldown, machine.Pin.IN, machine.Pin.PULL_UP): self._VOLDOWN,
machine.Pin(pin_next, machine.Pin.IN, machine.Pin.PULL_UP): self._NEXT}
self.int_flag = asyncio.ThreadSafeFlag()
self.pressed = []
self.last = {}
for button in self.buttons.keys():
button.irq(handler=self._interrupt, trigger=machine.Pin.IRQ_FALLING | machine.Pin.IRQ_RISING)
def _interrupt(self, button):
keycode = self.buttons[button]
last = self.last.get(keycode, 0)
now = time.ticks_ms()
self.last[keycode] = now
if now - last < 10:
# debounce, discard
return
if button.value() == 0:
# print(f'B{keycode} {now}')
self.pressed.append(keycode)
self.int_flag.set()
async def task(self):
while True:
await self.int_flag.wait()
while len(self.pressed) > 0:
what = self.pressed.pop()
if what == self._VOLUP:
self.player.set_volume(min(255, self.player.get_volume()+1))
elif what == self._VOLDOWN:
self.player.set_volume(max(0, self.player.get_volume()-1))
elif what == self._NEXT:
self.player.play_next()
class SDContext:
def __init__(self, mosi, miso, sck, ss, baudrate):
self.mosi = mosi
@@ -132,20 +92,21 @@ def run():
player.set_volume(32)
asyncio.create_task(player.task())
buttons = Buttons(player)
asyncio.create_task(buttons.task())
# Setup app
timer_manager = TimerManager(True)
playback_manager = TagPlaybackManager(timer_manager, player)
# Setup NFC
reader = MFRC522(spi_id=1, sck=10, miso=12, mosi=11, cs=13, rst=9, tocard_retries=20)
nfc = Nfc(reader, playback_manager.onTagChange)
# Setup app
deps = app.Dependencies(mp3player=lambda _: player,
timermanager=lambda _: timer_manager,
nfcreader=lambda the_app: Nfc(reader, the_app),
buttons=lambda the_app: Buttons(the_app))
the_app = app.PlayerApp(deps)
# Start
asyncio.create_task(aiorepl.task({'player': player, 'timer_manager': timer_manager,
'playback_manager': playback_manager, 'nfc': nfc}))
'app': the_app}))
asyncio.get_event_loop().run_forever()

View File

@@ -8,6 +8,15 @@ import asyncio
import time
from mfrc522 import MFRC522
try:
from typing import TYPE_CHECKING # type: ignore
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
import typing
class TagCallback(typing.Protocol):
def onTagChange(self, uid: list[int]) -> None: ...
class Nfc:
@@ -29,11 +38,11 @@ class Nfc:
asyncio.run(main())
'''
def __init__(self, reader: MFRC522, onTagChange=None):
def __init__(self, reader: MFRC522, cb: TagCallback | None = None):
self.reader = reader
self.last_uid = None
self.last_uid_timestamp = None
self.onTagChange = onTagChange
self.last_uid: list[int] | None = None
self.last_uid_timestamp: int | None = None
self.cb = cb
self.task = asyncio.create_task(self._reader_poll_task())
@staticmethod
@@ -43,7 +52,7 @@ class Nfc:
'''
return '0x' + ''.join(f'{i:02x}' for i in uid)
def _read_tag_sn(self) -> list[int]:
def _read_tag_sn(self) -> list[int] | None:
(stat, _) = self.reader.request(self.reader.REQIDL)
if stat == self.reader.OK:
(stat, uid) = self.reader.SelectTagSN()
@@ -64,8 +73,8 @@ class Nfc:
if uid is not None:
self.last_uid = uid
self.last_uid_timestamp = time.ticks_us()
if self.onTagChange is not None and last_callback_uid != uid:
self.onTagChange(uid)
if self.cb is not None and last_callback_uid != uid:
self.cb.onTagChange(uid)
last_callback_uid = uid
await asyncio.sleep_ms(poll_interval_ms)

View File

@@ -0,0 +1,7 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
from utils.buttons import Buttons
from utils.timer import TimerManager
__all__ = ["Buttons", "TimerManager"]

View File

@@ -0,0 +1,53 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import asyncio
import machine
import micropython
import time
try:
from typing import TYPE_CHECKING # type: ignore
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
import typing
class ButtonCallback(typing.Protocol):
def onButtonPressed(self, what: int) -> None: ...
class Buttons:
def __init__(self, cb: ButtonCallback, pin_volup=17, pin_voldown=19, pin_next=18):
self.VOLUP = micropython.const(1)
self.VOLDOWN = micropython.const(2)
self.NEXT = micropython.const(3)
self.cb = cb
self.buttons = {machine.Pin(pin_volup, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLUP,
machine.Pin(pin_voldown, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLDOWN,
machine.Pin(pin_next, machine.Pin.IN, machine.Pin.PULL_UP): self.NEXT}
self.int_flag = asyncio.ThreadSafeFlag()
self.pressed: list[int] = []
self.last: dict[int, int] = {}
for button in self.buttons.keys():
button.irq(handler=self._interrupt, trigger=machine.Pin.IRQ_FALLING | machine.Pin.IRQ_RISING)
asyncio.create_task(self.task())
def _interrupt(self, button):
keycode = self.buttons[button]
last = self.last.get(keycode, 0)
now = time.ticks_ms()
self.last[keycode] = now
if now - last < 10:
# debounce, discard
return
if button.value() == 0:
# print(f'B{keycode} {now}')
self.pressed.append(keycode)
self.int_flag.set()
async def task(self):
while True:
await self.int_flag.wait()
while len(self.pressed) > 0:
what = self.pressed.pop()
self.cb.onButtonPressed(what)

View File

@@ -0,0 +1,64 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import asyncio
import heapq
import time
class TimerManager:
def __init__(self, timer_debug=False):
self.timers = []
self.timer_debug = timer_debug
self.task = asyncio.create_task(self._timer_worker())
self.worker_event = asyncio.Event()
def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
heapq.heappush(self.timers, (when, what))
if cur_nearest is None or cur_nearest > self.timers[0][0]:
# New timer is closer than previous closest timer
if self.timer_debug:
print(f'cur_nearest: {cur_nearest}, new next: {self.timers[0][0]}')
print("schedule: wake")
self.worker_event.set()
def cancel(self, what):
try:
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
except StopIteration:
return False
del self.timers[i]
heapq.heapify(self.timers)
if i == 0:
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
async def _timer_worker(self):
while True:
if len(self.timers) == 0:
# Nothing to do
await self.worker_event.wait()
if self.timer_debug:
print("_timer_worker: event 0")
self.worker_event.clear()
continue
cur_nearest = self.timers[0][0]
wait_time = cur_nearest - time.ticks_ms()
if wait_time > 0:
if self.timer_debug:
print(f"_timer_worker: next is {self.timers[0]}, sleep {wait_time} ms")
try:
await asyncio.wait_for_ms(self.worker_event.wait(), wait_time)
if self.timer_debug:
print("_timer_worker: event 1")
# got woken up due to event
self.worker_event.clear()
continue
except asyncio.TimeoutError:
pass
_, callback = heapq.heappop(self.timers)
callback()