fix: Ensure each timer can only be scheduled once
All checks were successful
Build RPi Pico firmware image / Build-Firmware (push) Successful in 4m58s
Check code formatting / Check-C-Format (push) Successful in 8s
Check code formatting / Check-Python-Flake8 (push) Successful in 11s
Check code formatting / Check-Bash-Shellcheck (push) Successful in 6s
Run unit tests on host / Run-Unit-Tests (push) Successful in 10s
Run pytests / Check-Pytest (push) Successful in 13s

If a timer is scheduled with the timer manager while it is already
scheduled, the new exipry time should override the prev. expiry time
instead of adding a second instance of the same timer function.

Signed-off-by: Matthias Blankertz <matthias@blankertz.org>
This commit is contained in:
2025-11-30 13:39:36 +01:00
parent fa0e23ee87
commit 2e1bc7782b

View File

@@ -22,6 +22,7 @@ class TimerManager(object):
def schedule(self, when, what): def schedule(self, when, what):
cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None cur_nearest = self.timers[0][0] if len(self.timers) > 0 else None
self._remove_timer(what) # Ensure timer is not already scheduled
heapq.heappush(self.timers, (when, what)) heapq.heappush(self.timers, (when, what))
if cur_nearest is None or cur_nearest > self.timers[0][0]: if cur_nearest is None or cur_nearest > self.timers[0][0]:
# New timer is closer than previous closest timer # New timer is closer than previous closest timer
@@ -31,18 +32,22 @@ class TimerManager(object):
self.worker_event.set() self.worker_event.set()
def cancel(self, what): def cancel(self, what):
remove_idx = self._remove_timer(what)
if remove_idx == 0:
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
def _remove_timer(self, what):
try: try:
(when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers))))) (when, _), i = next(filter(lambda item: item[0][1] == what, zip(self.timers, range(len(self.timers)))))
except StopIteration: except StopIteration:
return False return False
del self.timers[i] del self.timers[i]
heapq.heapify(self.timers) heapq.heapify(self.timers)
if i == 0: return i
# Cancel timer was closest timer
if self.timer_debug:
print("cancel: wake")
self.worker_event.set()
return True
async def _timer_worker(self): async def _timer_worker(self):
while True: while True: