Turn TimerManager into a Singleton

This commit is contained in:
2025-05-20 20:18:25 +02:00
parent 69b6f6e860
commit 7712c25627
3 changed files with 17 additions and 12 deletions

View File

@@ -4,16 +4,17 @@
from collections import namedtuple
import os
import time
from utils import TimerManager
Dependencies = namedtuple('PlayerAppDependencies', ('mp3player', 'timermanager', 'nfcreader', 'buttons'))
Dependencies = namedtuple('Dependencies', ('mp3player', 'nfcreader', 'buttons'))
class PlayerApp:
def __init__(self, deps: Dependencies):
self.current_tag = None
self.current_tag_time = time.ticks_ms()
self.timer_manager = deps.timermanager(self)
self.timer_manager = TimerManager()
self.player = deps.mp3player(self)
self.nfc = deps.nfcreader(self)
self.buttons = deps.buttons(self) if deps.buttons is not None else None

View File

@@ -92,20 +92,17 @@ def run():
player.set_volume(32)
asyncio.create_task(player.task())
timer_manager = TimerManager(True)
# Setup NFC
reader = MFRC522(spi_id=1, sck=10, miso=12, mosi=11, cs=13, rst=9, tocard_retries=20)
# Setup app
deps = app.Dependencies(mp3player=lambda _: player,
timermanager=lambda _: timer_manager,
nfcreader=lambda the_app: Nfc(reader, the_app),
buttons=lambda the_app: Buttons(the_app))
the_app = app.PlayerApp(deps)
# Start
asyncio.create_task(aiorepl.task({'player': player, 'timer_manager': timer_manager,
asyncio.create_task(aiorepl.task({'timer_manager': TimerManager(),
'app': the_app}))
asyncio.get_event_loop().run_forever()

View File

@@ -5,13 +5,20 @@ import asyncio
import heapq
import time
TIMER_DEBUG = True
class TimerManager:
def __init__(self, timer_debug=False):
self.timers = []
self.timer_debug = timer_debug
self.task = asyncio.create_task(self._timer_worker())
self.worker_event = asyncio.Event()
class TimerManager(object):
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(TimerManager, cls).__new__(cls)
cls._instance.timers = []
cls._instance.timer_debug = TIMER_DEBUG
cls._instance.task = asyncio.create_task(cls._instance._timer_worker())
cls._instance.worker_event = asyncio.Event()
return cls._instance
def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None