Add basic application for playback based on NFC tags

Copy and clean up test.py to main.py. Add app module (currently on
app.py, maybe make it a directory later) for application classes.

Implement app.TimerManager to allow scheduling of delayed events in the
micropython async framework.

Implement app.TagPlaybackManager which handles playing back MP3 files
based on the NFC tag reader. Currently, simply plays all MP3 files in a
folder, for which the folder name matches the tag id, in order. Resume,
random and other features not yet supported.
This commit is contained in:
2025-03-22 14:36:32 +01:00
parent fb496b6991
commit d02776eea8
2 changed files with 185 additions and 0 deletions

102
software/src/app.py Normal file
View File

@@ -0,0 +1,102 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
import asyncio
import heapq
import os
import time
class TimerManager:
def __init__(self, timer_debug=False):
self.timers = []
self.timer_debug = timer_debug
self.task = asyncio.create_task(self._timer_worker())
self.worker_event = asyncio.Event()
def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
heapq.heappush(self.timers, (when, what))
if cur_nearest is None or cur_nearest > self.timers[0][0]:
# New timer is closer than previous closest timer
if self.timer_debug:
print(f'cur_nearest: {cur_nearest}, new next: {self.timers[0][0]}')
print("schedule: wake")
self.worker_event.set()
def cancel(self, what):
try:
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
except StopIteration:
return False
del self.timers[i]
heapq.heapify(self.timers)
if i == 0:
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
async def _timer_worker(self):
while True:
if len(self.timers) == 0:
# Nothing to do
await self.worker_event.wait()
if self.timer_debug:
print("_timer_worker: event 0")
self.worker_event.clear()
continue
cur_nearest = self.timers[0][0]
wait_time = cur_nearest - time.ticks_ms()
if wait_time > 0:
if self.timer_debug:
print(f"_timer_worker: next is {self.timers[0]}, sleep {wait_time} ms")
try:
await asyncio.wait_for_ms(self.worker_event.wait(), wait_time)
if self.timer_debug:
print("_timer_worker: event 1")
# got woken up due to event
self.worker_event.clear()
continue
except asyncio.TimeoutError:
pass
_, callback = heapq.heappop(self.timers)
callback()
class TagPlaybackManager:
def __init__(self, timer_manager, player):
self.current_tag = None
self.current_tag_time = time.ticks_ms()
self.timer_manager = timer_manager
self.player = player
def onTagChange(self, new_tag):
if new_tag is not None:
self.timer_manager.cancel(self.onTagRemoveDelay)
if new_tag == self.current_tag:
return
# Change playlist on new tag
if new_tag is not None:
self.current_tag_time = time.ticks_ms()
self.current_tag = new_tag
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
try:
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
if name.endswith(b'mp3')]
except OSError as ex:
print(f'Could not get playlist for tag {uid_str}: {ex}')
self.current_tag = None
self.player.stop()
return
testfiles.sort()
self.player.set_playlist(testfiles)
else:
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
def onTagRemoveDelay(self):
if self.current_tag is not None:
print('Tag gone, stopping playback')
self.current_tag = None
self.player.stop()

83
software/src/main.py Normal file
View File

@@ -0,0 +1,83 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2024-2025 Matthias Blankertz <matthias@blankertz.org>
import aiorepl
import asyncio
import machine
import micropython
import os
import time
from machine import Pin
from math import pi, sin, pow
# Own modules
from app import TimerManager, TagPlaybackManager
from audiocore import Audiocore
from mp3player import MP3Player
from nfc import Nfc
from rp2_neopixel import NeoPixel
from rp2_sd import SDCard
micropython.alloc_emergency_exception_buf(100)
async def rainbow(np, period=10):
def gamma(value, X=2.2):
return min(max(int(brightness * pow(value / 255.0, X) * 255.0 + 0.5), 0), 255)
brightness = 0.5
count = 0.0
leds = len(np)
while True:
for i in range(leds):
ofs = (count + i) % leds
np[i] = (gamma((sin(ofs / leds * 2 * pi) + 1) * 127),
gamma((sin(ofs / leds * 2 * pi + 2/3*pi) + 1) * 127),
gamma((sin(ofs / leds * 2 * pi + 4/3*pi) + 1) * 127))
count += 0.2
before = time.ticks_ms()
await np.async_write()
now = time.ticks_ms()
if before + 20 > now:
await asyncio.sleep_ms(20 - (now - before))
# Machine setup
# Set 8 mA drive strength and fast slew rate
machine.mem32[0x4001c004 + 6*4] = 0x67
machine.mem32[0x4001c004 + 7*4] = 0x67
machine.mem32[0x4001c004 + 8*4] = 0x67
# high prio for proc 1
machine.mem32[0x40030000 + 0x00] = 0x10
# Setup SD card
try:
sd = SDCard(mosi=Pin(3), miso=Pin(4), sck=Pin(2), ss=Pin(5), baudrate=15000000)
os.mount(sd, '/sd')
except OSError as ex:
print(f'Failed to setup SD card: {ex}')
# Setup LEDs
pin = Pin.board.GP16
np = NeoPixel(pin, 10, sm=1)
asyncio.create_task(rainbow(np))
# Setup MP3 player
audioctx = Audiocore(Pin(8), Pin(6))
player = MP3Player(audioctx)
player.set_volume(128)
asyncio.create_task(player.task())
# Setup app
timer_manager = TimerManager(True)
playback_manager = TagPlaybackManager(timer_manager, player)
# Setup NFC
nfc = Nfc(playback_manager.onTagChange)
# Start
asyncio.create_task(aiorepl.task({'player': player, 'timer_manager': timer_manager,
'playback_manager': playback_manager}))
asyncio.get_event_loop().run_forever()