Merge pull request 'nfc-mp3-demo' (#19) from nfc-mp3-demo into main
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m3s
Check code formatting / Check-C-Format (push) Successful in 6s
Check code formatting / Check-Python-Flake8 (push) Successful in 9s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 4s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 3m3s
Check code formatting / Check-C-Format (push) Successful in 6s
Check code formatting / Check-Python-Flake8 (push) Successful in 9s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 4s
Run unit tests on host / Run-Unit-Tests (push) Successful in 8s
Reviewed-on: #19 Reviewed-by: Stefan Kratochwil <kratochwil-la@gmx.de>
This commit was merged in pull request #19.
This commit is contained in:
@@ -4,5 +4,5 @@ FROM gitea/runner-images:ubuntu-22.04
|
||||
# Install gcc-arm-none-eabi
|
||||
RUN apt update && \
|
||||
DEBIAN_FRONTEND=noninteractive \
|
||||
apt install -y --no-install-recommends cmake gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib \
|
||||
apt install -y --no-install-recommends cmake gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib cpio \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
@@ -25,7 +25,10 @@ if ! command -v $PICOTOOL >/dev/null 2>&1; then
|
||||
fi
|
||||
fi
|
||||
BUILDDIR=lib/micropython/ports/rp2/build-TONBERRY_RPI_PICO_W/
|
||||
tools/mklittlefs/mklittlefs -p 256 -s 868352 -c src/ $BUILDDIR/filesystem.bin
|
||||
FS_STAGE_DIR=$(mktemp -d)
|
||||
trap 'rm -rf $FS_STAGE_DIR' EXIT
|
||||
find src/ -iname '*.py' | cpio -pdm "$FS_STAGE_DIR"
|
||||
tools/mklittlefs/mklittlefs -p 256 -s 868352 -c "$FS_STAGE_DIR"/src $BUILDDIR/filesystem.bin
|
||||
truncate -s 2M $BUILDDIR/firmware-filesystem.bin
|
||||
dd if=$BUILDDIR/firmware.bin of=$BUILDDIR/firmware-filesystem.bin bs=1k
|
||||
dd if=$BUILDDIR/filesystem.bin of=$BUILDDIR/firmware-filesystem.bin bs=1k seek=1200
|
||||
|
||||
@@ -5,9 +5,11 @@ from asyncio import ThreadSafeFlag
|
||||
class Audiocore:
|
||||
def __init__(self, pin, sideset):
|
||||
self.notify = ThreadSafeFlag()
|
||||
self._audiocore = _audiocore.Audiocore(pin, sideset, self._interrupt)
|
||||
self.pin = pin
|
||||
self.sideset = sideset
|
||||
self._audiocore = _audiocore.Audiocore(self.pin, self.sideset, self._interrupt)
|
||||
|
||||
def __del__(self):
|
||||
def deinit(self):
|
||||
self._audiocore.deinit()
|
||||
|
||||
def _interrupt(self, _):
|
||||
@@ -35,3 +37,16 @@ class Audiocore:
|
||||
if pos >= len(buffer):
|
||||
return (pos, buf_space, underruns)
|
||||
await self.notify.wait()
|
||||
|
||||
|
||||
class AudioContext:
|
||||
def __init__(self, pin, sideset):
|
||||
self.pin = pin
|
||||
self.sideset = sideset
|
||||
|
||||
def __enter__(self):
|
||||
self._audiocore = Audiocore(self.pin, self.sideset)
|
||||
return self._audiocore
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._audiocore.deinit()
|
||||
|
||||
@@ -96,6 +96,7 @@ static MP_DEFINE_CONST_FUN_OBJ_3(sdcard_ioctl_obj, sdcard_ioctl);
|
||||
|
||||
static const mp_rom_map_elem_t sdcard_locals_dict_table[] = {
|
||||
{MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&sdcard_deinit_obj)},
|
||||
{MP_ROM_QSTR(MP_QSTR_deinit), MP_ROM_PTR(&sdcard_deinit_obj)},
|
||||
{MP_ROM_QSTR(MP_QSTR_ioctl), MP_ROM_PTR(&sdcard_ioctl_obj)},
|
||||
{MP_ROM_QSTR(MP_QSTR_readblocks), MP_ROM_PTR(&sdcard_readblocks_obj)},
|
||||
};
|
||||
|
||||
93
software/src/app.py
Normal file
93
software/src/app.py
Normal file
@@ -0,0 +1,93 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from collections import namedtuple
|
||||
import os
|
||||
import time
|
||||
from utils import TimerManager
|
||||
|
||||
|
||||
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
|
||||
|
||||
# Should be ~ 6dB steps
|
||||
VOLUME_CURVE = [1, 2, 4, 8, 16, 32, 63, 126, 251]
|
||||
|
||||
|
||||
class PlayerApp:
|
||||
def __init__(self, deps: Dependencies):
|
||||
self.current_tag = None
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.timer_manager = TimerManager()
|
||||
self.player = deps.mp3player(self)
|
||||
self.nfc = deps.nfcreader(self)
|
||||
self.buttons = deps.buttons(self) if deps.buttons is not None else None
|
||||
self.mp3file = None
|
||||
self.volume_pos = 3
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
|
||||
def __del__(self):
|
||||
if self.mp3file is not None:
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
|
||||
def onTagChange(self, new_tag):
|
||||
if new_tag is not None:
|
||||
self.timer_manager.cancel(self.onTagRemoveDelay)
|
||||
if new_tag == self.current_tag:
|
||||
return
|
||||
# Change playlist on new tag
|
||||
if new_tag is not None:
|
||||
self.current_tag_time = time.ticks_ms()
|
||||
self.current_tag = new_tag
|
||||
uid_str = ''.join('{:02x}'.format(x) for x in new_tag)
|
||||
try:
|
||||
testfiles = [f'/sd/{uid_str}/'.encode() + name for name in os.listdir(f'/sd/{uid_str}'.encode())
|
||||
if name.endswith(b'mp3')]
|
||||
except OSError as ex:
|
||||
print(f'Could not get playlist for tag {uid_str}: {ex}')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
return
|
||||
testfiles.sort()
|
||||
self._set_playlist(testfiles)
|
||||
else:
|
||||
self.timer_manager.schedule(time.ticks_ms() + 5000, self.onTagRemoveDelay)
|
||||
|
||||
def onTagRemoveDelay(self):
|
||||
if self.current_tag is not None:
|
||||
print('Tag gone, stopping playback')
|
||||
self.current_tag = None
|
||||
self.player.stop()
|
||||
|
||||
def onButtonPressed(self, what):
|
||||
if what == self.buttons.VOLUP:
|
||||
self.volume_pos = min(self.volume_pos + 1, len(VOLUME_CURVE) - 1)
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
elif what == self.buttons.VOLDOWN:
|
||||
self.volume_pos = max(self.volume_pos - 1, 0)
|
||||
self.player.set_volume(VOLUME_CURVE[self.volume_pos])
|
||||
elif what == self.buttons.NEXT:
|
||||
self._play_next()
|
||||
|
||||
def onPlaybackDone(self):
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self._play_next()
|
||||
|
||||
def _set_playlist(self, files: list[bytes]):
|
||||
self.playlist_pos = 0
|
||||
self.playlist = files
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
|
||||
def _play_next(self):
|
||||
if self.playlist_pos + 1 < len(self.playlist):
|
||||
self.playlist_pos += 1
|
||||
self._play(self.playlist[self.playlist_pos])
|
||||
|
||||
def _play(self, filename: bytes):
|
||||
if self.mp3file is not None:
|
||||
self.player.stop()
|
||||
self.mp3file.close()
|
||||
self.mp3file = None
|
||||
self.mp3file = open(filename, 'rb')
|
||||
self.player.play(self.mp3file)
|
||||
108
software/src/main.py
Normal file
108
software/src/main.py
Normal file
@@ -0,0 +1,108 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2024-2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import aiorepl
|
||||
import asyncio
|
||||
import machine
|
||||
import micropython
|
||||
import os
|
||||
import time
|
||||
from machine import Pin
|
||||
from math import pi, sin, pow
|
||||
|
||||
# Own modules
|
||||
import app
|
||||
from audiocore import AudioContext
|
||||
from mfrc522 import MFRC522
|
||||
from mp3player import MP3Player
|
||||
from nfc import Nfc
|
||||
from rp2_neopixel import NeoPixel
|
||||
from rp2_sd import SDCard
|
||||
from utils import Buttons, TimerManager
|
||||
|
||||
micropython.alloc_emergency_exception_buf(100)
|
||||
|
||||
|
||||
async def rainbow(np, period=10):
|
||||
def gamma(value, X=2.2):
|
||||
return min(max(int(brightness * pow(value / 255.0, X) * 255.0 + 0.5), 0), 255)
|
||||
|
||||
brightness = 0.5
|
||||
count = 0.0
|
||||
leds = len(np)
|
||||
while True:
|
||||
for i in range(leds):
|
||||
ofs = (count + i) % leds
|
||||
np[i] = (gamma((sin(ofs / leds * 2 * pi) + 1) * 127),
|
||||
gamma((sin(ofs / leds * 2 * pi + 2/3*pi) + 1) * 127),
|
||||
gamma((sin(ofs / leds * 2 * pi + 4/3*pi) + 1) * 127))
|
||||
count += 0.2
|
||||
before = time.ticks_ms()
|
||||
await np.async_write()
|
||||
now = time.ticks_ms()
|
||||
if before + 20 > now:
|
||||
await asyncio.sleep_ms(20 - (now - before))
|
||||
|
||||
|
||||
# Machine setup
|
||||
|
||||
# Set 8 mA drive strength and fast slew rate
|
||||
machine.mem32[0x4001c004 + 6*4] = 0x67
|
||||
machine.mem32[0x4001c004 + 7*4] = 0x67
|
||||
machine.mem32[0x4001c004 + 8*4] = 0x67
|
||||
# high prio for proc 1
|
||||
machine.mem32[0x40030000 + 0x00] = 0x10
|
||||
|
||||
|
||||
class SDContext:
|
||||
def __init__(self, mosi, miso, sck, ss, baudrate):
|
||||
self.mosi = mosi
|
||||
self.miso = miso
|
||||
self.sck = sck
|
||||
self.ss = ss
|
||||
self.baudrate = baudrate
|
||||
|
||||
def __enter__(self):
|
||||
self.sdcard = SDCard(self.mosi, self.miso, self.sck, self.ss, self.baudrate)
|
||||
try:
|
||||
os.mount(self.sdcard, '/sd')
|
||||
except Exception:
|
||||
self.sdcard.deinit()
|
||||
raise
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
try:
|
||||
os.umount('/sd')
|
||||
finally:
|
||||
self.sdcard.deinit()
|
||||
|
||||
|
||||
def run():
|
||||
asyncio.new_event_loop()
|
||||
# Setup LEDs
|
||||
pin = Pin.board.GP16
|
||||
np = NeoPixel(pin, 10, sm=1)
|
||||
asyncio.create_task(rainbow(np))
|
||||
|
||||
# Setup MP3 player
|
||||
with SDContext(mosi=Pin(3), miso=Pin(4), sck=Pin(2), ss=Pin(5), baudrate=15000000), \
|
||||
AudioContext(Pin(8), Pin(6)) as audioctx:
|
||||
|
||||
# Setup NFC
|
||||
reader = MFRC522(spi_id=1, sck=10, miso=12, mosi=11, cs=13, rst=9, tocard_retries=20)
|
||||
|
||||
# Setup app
|
||||
deps = app.Dependencies(mp3player=lambda the_app: MP3Player(audioctx, the_app),
|
||||
nfcreader=lambda the_app: Nfc(reader, the_app),
|
||||
buttons=lambda the_app: Buttons(the_app))
|
||||
the_app = app.PlayerApp(deps)
|
||||
|
||||
# Start
|
||||
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
|
||||
'app': the_app}))
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
@@ -3,118 +3,71 @@
|
||||
|
||||
import asyncio
|
||||
from array import array
|
||||
from utils import TimerManager
|
||||
try:
|
||||
from typing import TYPE_CHECKING # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
import typing
|
||||
|
||||
class PlayerCallback(typing.Protocol):
|
||||
def onPlaybackDone(self) -> None: ...
|
||||
|
||||
|
||||
class MP3Player:
|
||||
def __init__(self, audiocore):
|
||||
def __init__(self, audiocore, cb: PlayerCallback):
|
||||
self.audiocore = audiocore
|
||||
self.commands = []
|
||||
self.command_event = asyncio.Event()
|
||||
self.playlist = []
|
||||
self.mp3task = None
|
||||
self.volume = 128
|
||||
self.cb = cb
|
||||
|
||||
def set_playlist(self, mp3files):
|
||||
def play(self, stream):
|
||||
"""
|
||||
Set a new playlist and start playing from the first entry.
|
||||
For convenience a single file name can also be passed.
|
||||
Play from byte stream.
|
||||
"""
|
||||
if type(mp3files) is bytes:
|
||||
self.playlist = [mp3files]
|
||||
else:
|
||||
self.playlist = mp3files
|
||||
self._send_command('newplaylist')
|
||||
|
||||
def play_next(self):
|
||||
"""
|
||||
Skip to the next track in the playlist. Reaching the end of the playlist stops playback.
|
||||
"""
|
||||
self._send_command('next')
|
||||
|
||||
def play_prev(self):
|
||||
"""
|
||||
Skip to the previous track in the playlist.
|
||||
"""
|
||||
self._send_command('prev')
|
||||
if self.mp3task is not None:
|
||||
self.mp3task.cancel()
|
||||
self.mp3task = None
|
||||
self.mp3task = asyncio.create_task(self._play_task(stream))
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop playback, remembering the current position in the playlist (but not inside a track).
|
||||
Stop playback
|
||||
"""
|
||||
self._send_command('stop')
|
||||
|
||||
def play(self):
|
||||
"""
|
||||
Start playback.
|
||||
"""
|
||||
self._send_command('play')
|
||||
if self.mp3task is not None:
|
||||
self.mp3task.cancel()
|
||||
self.mp3task = None
|
||||
|
||||
def set_volume(self, volume: int):
|
||||
"""
|
||||
Set volume (0..255).
|
||||
"""
|
||||
self.volume = volume
|
||||
self.audiocore.set_volume(volume)
|
||||
|
||||
def _send_command(self, command: str):
|
||||
self.commands.append(command)
|
||||
self.command_event.set()
|
||||
def get_volume(self) -> int:
|
||||
return self.volume
|
||||
|
||||
async def _play_task(self, mp3path):
|
||||
async def _play_task(self, stream):
|
||||
known_underruns = 0
|
||||
send_done = False
|
||||
data = array('b', range(512))
|
||||
try:
|
||||
print(b'Playing ' + mp3path)
|
||||
with open(mp3path, 'rb') as mp3file:
|
||||
while True:
|
||||
bytes_read = mp3file.readinto(data)
|
||||
if bytes_read == 0:
|
||||
# End of file
|
||||
break
|
||||
_, _, underruns = await self.audiocore.async_put(data[:bytes_read])
|
||||
if underruns > known_underruns:
|
||||
print(f"{underruns:x}")
|
||||
known_underruns = underruns
|
||||
# Intentionally do not use _send_command, we don't want to set command_event yet
|
||||
self.commands.append('done')
|
||||
while True:
|
||||
bytes_read = stream.readinto(data)
|
||||
if bytes_read == 0:
|
||||
# End of file
|
||||
break
|
||||
_, _, underruns = await self.audiocore.async_put(data[:bytes_read])
|
||||
if underruns > known_underruns:
|
||||
print(f"{underruns:x}")
|
||||
known_underruns = underruns
|
||||
# Call onPlaybackDone after flush
|
||||
send_done = True
|
||||
finally:
|
||||
self.audiocore.flush()
|
||||
self.command_event.set()
|
||||
|
||||
def _play(self, mp3path):
|
||||
if self.mp3task is not None:
|
||||
self.mp3task.cancel()
|
||||
self.mp3task = None
|
||||
if mp3path is not None:
|
||||
self.mp3task = asyncio.create_task(self._play_task(mp3path))
|
||||
|
||||
async def task(self):
|
||||
playlist_pos = 0
|
||||
while True:
|
||||
await self.command_event.wait()
|
||||
self.command_event.clear()
|
||||
change_play = False
|
||||
while len(self.commands) > 0:
|
||||
command = self.commands.pop()
|
||||
if command == 'next' or command == 'done':
|
||||
if playlist_pos + 1 < len(self.playlist):
|
||||
playlist_pos += 1
|
||||
change_play = True
|
||||
else:
|
||||
# reaching the end of the playlist stops playback
|
||||
self._play(None)
|
||||
elif command == 'prev':
|
||||
if playlist_pos > 0:
|
||||
playlist_pos -= 1
|
||||
change_play = True
|
||||
elif command == 'stop':
|
||||
self._play(None)
|
||||
elif command == 'play':
|
||||
if self.mp3task is None:
|
||||
change_play = True
|
||||
elif command == 'newplaylist':
|
||||
if len(self.playlist) > 0:
|
||||
playlist_pos = 0
|
||||
change_play = True
|
||||
else:
|
||||
self._play(None)
|
||||
if change_play:
|
||||
self._play(self.playlist[playlist_pos])
|
||||
if send_done:
|
||||
# Only call onPlaybackDone if exit due to end of stream
|
||||
# Use timer with time 0 to call callback "immediately" but from a different task
|
||||
TimerManager().schedule(0, self.cb.onPlaybackDone)
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
'''
|
||||
SPDX-License-Identifier: MIT
|
||||
Copyright (c) 2025 Stefan Kratochwil (Kratochwil-LA@gmx.de)
|
||||
Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
'''
|
||||
from nfc.nfc import Nfc
|
||||
|
||||
__all__ = ['Nfc']
|
||||
__all__ = (Nfc)
|
||||
|
||||
@@ -1,12 +1,22 @@
|
||||
'''
|
||||
SPDX-License-Identifier: MIT
|
||||
Copyright (c) 2025 Stefan Kratochwil (Kratochwil-LA@gmx.de)
|
||||
Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
'''
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from mfrc522 import MFRC522
|
||||
try:
|
||||
from typing import TYPE_CHECKING # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
import typing
|
||||
|
||||
class TagCallback(typing.Protocol):
|
||||
def onTagChange(self, uid: list[int]) -> None: ...
|
||||
|
||||
|
||||
class Nfc:
|
||||
@@ -28,10 +38,11 @@ class Nfc:
|
||||
|
||||
asyncio.run(main())
|
||||
'''
|
||||
def __init__(self, reader: MFRC522):
|
||||
def __init__(self, reader: MFRC522, cb: TagCallback | None = None):
|
||||
self.reader = reader
|
||||
self.last_uid = None
|
||||
self.last_uid_timestamp = None
|
||||
self.last_uid: list[int] | None = None
|
||||
self.last_uid_timestamp: int | None = None
|
||||
self.cb = cb
|
||||
self.task = asyncio.create_task(self._reader_poll_task())
|
||||
|
||||
@staticmethod
|
||||
@@ -41,20 +52,30 @@ class Nfc:
|
||||
'''
|
||||
return '0x' + ''.join(f'{i:02x}' for i in uid)
|
||||
|
||||
def _read_tag_sn(self) -> list[int] | None:
|
||||
(stat, _) = self.reader.request(self.reader.REQIDL)
|
||||
if stat == self.reader.OK:
|
||||
(stat, uid) = self.reader.SelectTagSN()
|
||||
if stat == self.reader.OK:
|
||||
return uid
|
||||
return None
|
||||
|
||||
async def _reader_poll_task(self, poll_interval_ms: int = 50):
|
||||
'''
|
||||
Periodically polls the nfc reader. Stores tag uid and timestamp if a new tag was found.
|
||||
'''
|
||||
last_callback_uid = None
|
||||
while True:
|
||||
self.reader.init()
|
||||
|
||||
# For now we omit the tag type
|
||||
(stat, _) = self.reader.request(self.reader.REQIDL)
|
||||
if stat == self.reader.OK:
|
||||
(stat, uid) = self.reader.SelectTagSN()
|
||||
if stat == self.reader.OK:
|
||||
self.last_uid = uid
|
||||
self.last_uid_timestamp = time.ticks_us()
|
||||
uid = self._read_tag_sn()
|
||||
if uid is not None:
|
||||
self.last_uid = uid
|
||||
self.last_uid_timestamp = time.ticks_us()
|
||||
if self.cb is not None and last_callback_uid != uid:
|
||||
self.cb.onTagChange(uid)
|
||||
last_callback_uid = uid
|
||||
|
||||
await asyncio.sleep_ms(poll_interval_ms)
|
||||
|
||||
|
||||
7
software/src/utils/__init__.py
Normal file
7
software/src/utils/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
from utils.buttons import Buttons
|
||||
from utils.timer import TimerManager
|
||||
|
||||
__all__ = ["Buttons", "TimerManager"]
|
||||
53
software/src/utils/buttons.py
Normal file
53
software/src/utils/buttons.py
Normal file
@@ -0,0 +1,53 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import asyncio
|
||||
import machine
|
||||
import micropython
|
||||
import time
|
||||
try:
|
||||
from typing import TYPE_CHECKING # type: ignore
|
||||
except ImportError:
|
||||
TYPE_CHECKING = False
|
||||
if TYPE_CHECKING:
|
||||
import typing
|
||||
|
||||
class ButtonCallback(typing.Protocol):
|
||||
def onButtonPressed(self, what: int) -> None: ...
|
||||
|
||||
|
||||
class Buttons:
|
||||
def __init__(self, cb: ButtonCallback, pin_volup=17, pin_voldown=19, pin_next=18):
|
||||
self.VOLUP = micropython.const(1)
|
||||
self.VOLDOWN = micropython.const(2)
|
||||
self.NEXT = micropython.const(3)
|
||||
self.cb = cb
|
||||
self.buttons = {machine.Pin(pin_volup, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLUP,
|
||||
machine.Pin(pin_voldown, machine.Pin.IN, machine.Pin.PULL_UP): self.VOLDOWN,
|
||||
machine.Pin(pin_next, machine.Pin.IN, machine.Pin.PULL_UP): self.NEXT}
|
||||
self.int_flag = asyncio.ThreadSafeFlag()
|
||||
self.pressed: list[int] = []
|
||||
self.last: dict[int, int] = {}
|
||||
for button in self.buttons.keys():
|
||||
button.irq(handler=self._interrupt, trigger=machine.Pin.IRQ_FALLING | machine.Pin.IRQ_RISING)
|
||||
asyncio.create_task(self.task())
|
||||
|
||||
def _interrupt(self, button):
|
||||
keycode = self.buttons[button]
|
||||
last = self.last.get(keycode, 0)
|
||||
now = time.ticks_ms()
|
||||
self.last[keycode] = now
|
||||
if now - last < 10:
|
||||
# debounce, discard
|
||||
return
|
||||
if button.value() == 0:
|
||||
# print(f'B{keycode} {now}')
|
||||
self.pressed.append(keycode)
|
||||
self.int_flag.set()
|
||||
|
||||
async def task(self):
|
||||
while True:
|
||||
await self.int_flag.wait()
|
||||
while len(self.pressed) > 0:
|
||||
what = self.pressed.pop()
|
||||
self.cb.onButtonPressed(what)
|
||||
71
software/src/utils/timer.py
Normal file
71
software/src/utils/timer.py
Normal file
@@ -0,0 +1,71 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2025 Matthias Blankertz <matthias@blankertz.org>
|
||||
|
||||
import asyncio
|
||||
import heapq
|
||||
import time
|
||||
|
||||
TIMER_DEBUG = True
|
||||
|
||||
|
||||
class TimerManager(object):
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(TimerManager, cls).__new__(cls)
|
||||
cls._instance.timers = []
|
||||
cls._instance.timer_debug = TIMER_DEBUG
|
||||
cls._instance.task = asyncio.create_task(cls._instance._timer_worker())
|
||||
cls._instance.worker_event = asyncio.Event()
|
||||
return cls._instance
|
||||
|
||||
def schedule(self, when, what):
|
||||
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
|
||||
heapq.heappush(self.timers, (when, what))
|
||||
if cur_nearest is None or cur_nearest > self.timers[0][0]:
|
||||
# New timer is closer than previous closest timer
|
||||
if self.timer_debug:
|
||||
print(f'cur_nearest: {cur_nearest}, new next: {self.timers[0][0]}')
|
||||
print("schedule: wake")
|
||||
self.worker_event.set()
|
||||
|
||||
def cancel(self, what):
|
||||
try:
|
||||
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
|
||||
except StopIteration:
|
||||
return False
|
||||
del self.timers[i]
|
||||
heapq.heapify(self.timers)
|
||||
if i == 0:
|
||||
# Cancel timer was closest timer
|
||||
if self.timer_debug:
|
||||
print("cancel: wake")
|
||||
self.worker_event.set()
|
||||
return True
|
||||
|
||||
async def _timer_worker(self):
|
||||
while True:
|
||||
if len(self.timers) == 0:
|
||||
# Nothing to do
|
||||
await self.worker_event.wait()
|
||||
if self.timer_debug:
|
||||
print("_timer_worker: event 0")
|
||||
self.worker_event.clear()
|
||||
continue
|
||||
cur_nearest = self.timers[0][0]
|
||||
wait_time = cur_nearest - time.ticks_ms()
|
||||
if wait_time > 0:
|
||||
if self.timer_debug:
|
||||
print(f"_timer_worker: next is {self.timers[0]}, sleep {wait_time} ms")
|
||||
try:
|
||||
await asyncio.wait_for_ms(self.worker_event.wait(), wait_time)
|
||||
if self.timer_debug:
|
||||
print("_timer_worker: event 1")
|
||||
# got woken up due to event
|
||||
self.worker_event.clear()
|
||||
continue
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
_, callback = heapq.heappop(self.timers)
|
||||
callback()
|
||||
Reference in New Issue
Block a user