diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2e39bf2..5bfdc33 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -42,6 +42,15 @@ jobs: - run: python -m pip install --upgrade pip wheel - run: pip install tox tox-gh-actions - run: tox -eupy + tests-circuitpython: + name: tests-circuitpython + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - run: python -m pip install --upgrade pip wheel + - run: pip install tox tox-gh-actions + - run: tox -ecpy coverage: name: coverage runs-on: ubuntu-latest diff --git a/bin/circuitpython b/bin/circuitpython new file mode 100755 index 0000000..aec239c Binary files /dev/null and b/bin/circuitpython differ diff --git a/libs/circuitpython/adafruit_ticks.py b/libs/circuitpython/adafruit_ticks.py new file mode 100644 index 0000000..5585d2b --- /dev/null +++ b/libs/circuitpython/adafruit_ticks.py @@ -0,0 +1,139 @@ +# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries +# SPDX-FileCopyrightText: Copyright (c) 2021 Jeff Epler for Adafruit Industries +# +# SPDX-License-Identifier: MIT +""" +`adafruit_ticks` +================================================================================ + +Work with intervals and deadlines in milliseconds + + +* Author(s): Jeff Epler + +Implementation Notes +-------------------- + +**Software and Dependencies:** + +* Adafruit CircuitPython firmware for the supported boards: + https://github.com/adafruit/circuitpython/releases + +""" + +# imports +from micropython import const + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ticks.git" + +_TICKS_PERIOD = const(1 << 29) +_TICKS_MAX = const(_TICKS_PERIOD - 1) +_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2) + +# Get the correct implementation of ticks_ms. There are three possibilities: +# +# - supervisor.ticks_ms is present. This will be the case starting in CP7.0 +# +# - time.ticks_ms is present. This is the case for MicroPython & for the "unix +# port" of CircuitPython, used for some automated testing. +# +# - time.monotonic_ns is present, and works. This is the case on most +# Express boards in CP6.x, and most host computer versions of Python. +# +# - Otherwise, time.monotonic is assumed to be present. This is the case +# on most non-express boards in CP6.x, and some old host computer versions +# of Python. +# +# Note that on microcontrollers, this time source becomes increasingly +# inaccurate when the board has not been reset in a long time, losing the +# ability to measure 1ms intervals after about 1 hour, and losing the +# ability to meausre 128ms intervals after 6 days. The only solution is to +# either upgrade to a version with supervisor.ticks_ms, or to switch to a +# board with time.monotonic_ns. + +try: + from supervisor import ticks_ms # pylint: disable=unused-import +except (ImportError, NameError): + import time + + if _ticks_ms := getattr(time, "ticks_ms", None): + + def ticks_ms() -> int: + """Return the time in milliseconds since an unspecified moment, + wrapping after 2**29ms. + + The wrap value was chosen so that it is always possible to add or + subtract two `ticks_ms` values without overflow on a board without + long ints (or without allocating any long integer objects, on + boards with long ints). + + This ticks value comes from a low-accuracy clock internal to the + microcontroller, just like `time.monotonic`. Due to its low + accuracy and the fact that it "wraps around" every few days, it is + intended for working with short term events like advancing an LED + animation, not for long term events like counting down the time + until a holiday.""" + return _ticks_ms() & _TICKS_MAX # pylint: disable=not-callable + + else: + try: + from time import monotonic_ns as _monotonic_ns + + _monotonic_ns() # Check that monotonic_ns is usable + + def ticks_ms() -> int: + """Return the time in milliseconds since an unspecified moment, + wrapping after 2**29ms. + + The wrap value was chosen so that it is always possible to add or + subtract two `ticks_ms` values without overflow on a board without + long ints (or without allocating any long integer objects, on + boards with long ints). + + This ticks value comes from a low-accuracy clock internal to the + microcontroller, just like `time.monotonic`. Due to its low + accuracy and the fact that it "wraps around" every few days, it is + intended for working with short term events like advancing an LED + animation, not for long term events like counting down the time + until a holiday.""" + return (_monotonic_ns() // 1_000_000) & _TICKS_MAX + + except (ImportError, NameError, NotImplementedError): + from time import monotonic as _monotonic + + def ticks_ms() -> int: + """Return the time in milliseconds since an unspecified moment, + wrapping after 2**29ms. + + The wrap value was chosen so that it is always possible to add or + subtract two `ticks_ms` values without overflow on a board without + long ints (or without allocating any long integer objects, on + boards with long ints). + + This ticks value comes from a low-accuracy clock internal to the + microcontroller, just like `time.monotonic`. Due to its low + accuracy and the fact that it "wraps around" every few days, it is + intended for working with short term events like advancing an LED + animation, not for long term events like counting down the time + until a holiday.""" + return int(_monotonic() * 1000) & _TICKS_MAX + + +def ticks_add(ticks: int, delta: int) -> int: + "Add a delta to a base number of ticks, performing wraparound at 2**29ms." + return (ticks + delta) % _TICKS_PERIOD + + +def ticks_diff(ticks1: int, ticks2: int) -> int: + """Compute the signed difference between two ticks values, + assuming that they are within 2**28 ticks""" + diff = (ticks1 - ticks2) & _TICKS_MAX + diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD + return diff + + +def ticks_less(ticks1: int, ticks2: int) -> bool: + """Return true if ticks1 is before ticks2 and false otherwise, + assuming that they are within 2**28 ticks""" + return ticks_diff(ticks1, ticks2) < 0 diff --git a/libs/circuitpython/asyncio/__init__.py b/libs/circuitpython/asyncio/__init__.py new file mode 100644 index 0000000..ce8837d --- /dev/null +++ b/libs/circuitpython/asyncio/__init__.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: 2019 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off + +from .core import * + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/Adafruit/Adafruit_CircuitPython_asyncio.git" + +_attrs = { + "wait_for": "funcs", + "wait_for_ms": "funcs", + "gather": "funcs", + "Event": "event", + "ThreadSafeFlag": "event", + "Lock": "lock", + "open_connection": "stream", + "start_server": "stream", + "StreamReader": "stream", + "StreamWriter": "stream", +} + +# Lazy loader, effectively does: +# global attr +# from .mod import attr +def __getattr__(attr): + mod = _attrs.get(attr, None) + if mod is None: + raise AttributeError(attr) + value = getattr(__import__(mod, None, None, True, 1), attr) + globals()[attr] = value + return value diff --git a/libs/circuitpython/asyncio/core.py b/libs/circuitpython/asyncio/core.py new file mode 100644 index 0000000..26a97ba --- /dev/null +++ b/libs/circuitpython/asyncio/core.py @@ -0,0 +1,430 @@ +# SPDX-FileCopyrightText: 2019 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Core +==== +""" + +from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add +import sys, select + +try: + from traceback import print_exception +except: + from .traceback import print_exception + +# Import TaskQueue and Task, preferring built-in C code over Python code +try: + from _asyncio import TaskQueue, Task +except ImportError: + from .task import TaskQueue, Task + +################################################################################ +# Exceptions + + +# Depending on the release of CircuitPython these errors may or may not +# exist in the C implementation of `_asyncio`. However, when they +# do exist, they must be preferred over the Python code. +try: + from _asyncio import CancelledError, InvalidStateError +except (ImportError, AttributeError): + class CancelledError(BaseException): + """Injected into a task when calling `Task.cancel()`""" + pass + + + class InvalidStateError(Exception): + """Can be raised in situations like setting a result value for a task object that already has a result value set.""" + pass + + +class TimeoutError(Exception): + """Raised when waiting for a task longer than the specified timeout.""" + + pass + + +# Used when calling Loop.call_exception_handler +_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} + + +################################################################################ +# Sleep functions + +# "Yield" once, then raise StopIteration +class SingletonGenerator: + def __init__(self): + self.state = None + self.exc = StopIteration() + + def __iter__(self): + return self + + def __await__(self): + return self + + def __next__(self): + if self.state is not None: + _task_queue.push_sorted(cur_task, self.state) + self.state = None + return None + else: + self.exc.__traceback__ = None + raise self.exc + + +# Pause task execution for the given time (integer in milliseconds, uPy extension) +# Use a SingletonGenerator to do it without allocating on the heap +def sleep_ms(t, sgen=SingletonGenerator()): + """Sleep for *t* milliseconds. + + This is a coroutine, and a MicroPython extension. + """ + + assert sgen.state is None, "Check for a missing `await` in your code" + sgen.state = ticks_add(ticks(), max(0, t)) + return sgen + + +# Pause task execution for the given time (in seconds) +def sleep(t): + """Sleep for *t* seconds + + This is a coroutine. + """ + + return sleep_ms(int(t * 1000)) + + +################################################################################ +# "Never schedule" object" +# Don't re-schedule the object that awaits _never(). +# For internal use only. Some constructs, like `await event.wait()`, +# work by NOT re-scheduling the task which calls wait(), but by +# having some other task schedule it later. +class _NeverSingletonGenerator: + def __init__(self): + self.state = None + self.exc = StopIteration() + + def __iter__(self): + return self + + def __await__(self): + return self + + def __next__(self): + if self.state is not None: + self.state = None + return None + else: + self.exc.__traceback__ = None + raise self.exc + +def _never(sgen=_NeverSingletonGenerator()): + # assert sgen.state is None, "Check for a missing `await` in your code" + sgen.state = False + return sgen + + +################################################################################ +# Queue and poller for stream IO + + +class IOQueue: + def __init__(self): + self.poller = select.poll() + self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] + + def _enqueue(self, s, idx): + if id(s) not in self.map: + entry = [None, None, s] + entry[idx] = cur_task + self.map[id(s)] = entry + self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + else: + sm = self.map[id(s)] + assert sm[idx] is None + assert sm[1 - idx] is not None + sm[idx] = cur_task + self.poller.modify(s, select.POLLIN | select.POLLOUT) + # Link task to this IOQueue so it can be removed if needed + cur_task.data = self + + def _dequeue(self, s): + del self.map[id(s)] + self.poller.unregister(s) + + async def queue_read(self, s): + self._enqueue(s, 0) + await _never() + + async def queue_write(self, s): + self._enqueue(s, 1) + await _never() + + def remove(self, task): + while True: + del_s = None + for k in self.map: # Iterate without allocating on the heap + q0, q1, s = self.map[k] + if q0 is task or q1 is task: + del_s = s + break + if del_s is not None: + self._dequeue(s) + else: + break + + def wait_io_event(self, dt): + for s, ev in self.poller.ipoll(dt): + sm = self.map[id(s)] + # print('poll', s, sm, ev) + if ev & ~select.POLLOUT and sm[0] is not None: + # POLLIN or error + _task_queue.push_head(sm[0]) + sm[0] = None + if ev & ~select.POLLIN and sm[1] is not None: + # POLLOUT or error + _task_queue.push_head(sm[1]) + sm[1] = None + if sm[0] is None and sm[1] is None: + self._dequeue(s) + elif sm[0] is None: + self.poller.modify(s, select.POLLOUT) + else: + self.poller.modify(s, select.POLLIN) + + +################################################################################ +# Main run loop + +# Ensure the awaitable is a task +def _promote_to_task(aw): + return aw if isinstance(aw, Task) else create_task(aw) + + +# Create and schedule a new task from a coroutine +def create_task(coro): + """Create a new task from the given coroutine and schedule it to run. + + Returns the corresponding `Task` object. + """ + + if not hasattr(coro, "send"): + raise TypeError("coroutine expected") + t = Task(coro, globals()) + _task_queue.push_head(t) + return t + + +# Keep scheduling tasks until there are none left to schedule +def run_until_complete(main_task=None): + """Run the given *main_task* until it completes.""" + + global cur_task + excs_all = (CancelledError, Exception) # To prevent heap allocation in loop + excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + while True: + # Wait until the head of _task_queue is ready to run + dt = 1 + while dt > 0: + dt = -1 + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = max(0, ticks_diff(t.ph_key, ticks())) + elif not _io_queue.map: + # No tasks can be woken so finished running + return + # print('(poll {})'.format(dt), len(_io_queue.map)) + _io_queue.wait_io_event(dt) + + # Get next task to run and continue it + t = _task_queue.pop_head() + cur_task = t + try: + # Continue running the coroutine, it's responsible for rescheduling itself + exc = t.data + if not exc: + t.coro.send(None) + else: + # If the task is finished and on the run queue and gets here, then it + # had an exception and was not await'ed on. Throwing into it now will + # raise StopIteration and the code below will catch this and run the + # call_exception_handler function. + t.data = None + t.coro.throw(exc) + except excs_all as er: + # Check the task is not on any event queue + assert t.data is None + # This task is done, check if it's the main task and then loop should stop + if t is main_task: + if isinstance(er, StopIteration): + return er.value + raise er + if t.state: + # Task was running but is now finished. + waiting = False + if t.state is True: + # "None" indicates that the task is complete and not await'ed on (yet). + t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True + else: + # Schedule any other tasks waiting on the completion of this task. + while t.state.peek(): + _task_queue.push_head(t.state.pop_head()) + waiting = True + # "False" indicates that the task is complete and has been await'ed on. + t.state = False + if not waiting and not isinstance(er, excs_stop): + # An exception ended this detached task, so queue it for later + # execution to handle the uncaught exception if no other task retrieves + # the exception in the meantime (this is handled by Task.throw). + _task_queue.push_head(t) + # Save return value of coro to pass up to caller. + t.data = er + elif t.state is None: + # Task is already finished and nothing await'ed on the task, + # so call the exception handler. + _exc_context["exception"] = exc + _exc_context["future"] = t + Loop.call_exception_handler(_exc_context) + + +# Create a new task from a coroutine and run it until it finishes +def run(coro): + """Create a new task from the given coroutine and run it until it completes. + + Returns the value returned by *coro*. + """ + + return run_until_complete(create_task(coro)) + + +################################################################################ +# Event loop wrapper + + +async def _stopper(): + pass + + +_stop_task = None + + +class Loop: + """Class representing the event loop""" + + _exc_handler = None + + def create_task(coro): + """Create a task from the given *coro* and return the new `Task` object.""" + + return create_task(coro) + + def run_forever(): + """Run the event loop until `Loop.stop()` is called.""" + + global _stop_task + _stop_task = Task(_stopper(), globals()) + run_until_complete(_stop_task) + # TODO should keep running until .stop() is called, even if there're no tasks left + + def run_until_complete(aw): + """Run the given *awaitable* until it completes. If *awaitable* is not a task then + it will be promoted to one. + """ + + return run_until_complete(_promote_to_task(aw)) + + def stop(): + """Stop the event loop""" + + global _stop_task + if _stop_task is not None: + _task_queue.push_head(_stop_task) + # If stop() is called again, do nothing + _stop_task = None + + def close(): + """Close the event loop.""" + + pass + + def set_exception_handler(handler): + """Set the exception handler to call when a Task raises an exception that is not + caught. The *handler* should accept two arguments: ``(loop, context)`` + """ + + Loop._exc_handler = handler + + def get_exception_handler(): + """Get the current exception handler. Returns the handler, or ``None`` if no + custom handler is set. + """ + + return Loop._exc_handler + + def default_exception_handler(loop, context): + """The default exception handler that is called.""" + + exc = context["exception"] + print_exception(None, exc, exc.__traceback__) + + def call_exception_handler(context): + """Call the current exception handler. The argument *context* is passed through + and is a dictionary containing keys: + ``'message'``, ``'exception'``, ``'future'`` + """ + (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) + + +# The runq_len and waitq_len arguments are for legacy uasyncio compatibility +def get_event_loop(runq_len=0, waitq_len=0): + """Return the event loop used to schedule and run tasks. See `Loop`.""" + + return Loop + + +def current_task(): + """Return the `Task` object associated with the currently running task.""" + + return cur_task + + +def new_event_loop(): + """Reset the event loop and return it. + + **NOTE**: Since MicroPython only has a single event loop, this function just resets + the loop's state, it does not create a new one + """ + + global _task_queue, _io_queue, _exc_context, cur_task + # TaskQueue of Task instances + _task_queue = TaskQueue() + # Task queue and poller for stream IO + _io_queue = IOQueue() + cur_task = None + _exc_context['exception'] = None + _exc_context['future'] = None + return Loop + + +# Initialise default event loop +new_event_loop() diff --git a/libs/circuitpython/asyncio/event.py b/libs/circuitpython/asyncio/event.py new file mode 100644 index 0000000..a402d26 --- /dev/null +++ b/libs/circuitpython/asyncio/event.py @@ -0,0 +1,92 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Events +====== +""" + +from . import core + +# Event class for primitive events that can be waited on, set, and cleared +class Event: + """Create a new event which can be used to synchronize tasks. Events + start in the cleared state. + """ + + def __init__(self): + self.state = False # False=unset; True=set + self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event + + def is_set(self): + """Returns ``True`` if the event is set, ``False`` otherwise.""" + + return self.state + + def set(self): + """Set the event. Any tasks waiting on the event will be scheduled to run. + """ + + # Event becomes set, schedule any tasks waiting on it + # Note: This must not be called from anything except the thread running + # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). + while self.waiting.peek(): + core._task_queue.push_head(self.waiting.pop_head()) + self.state = True + + def clear(self): + """Clear the event.""" + + self.state = False + + async def wait(self): + """Wait for the event to be set. If the event is already set then it returns + immediately. + + This is a coroutine. + """ + + if not self.state: + # Event not set, put the calling task on the event's waiting queue + self.waiting.push_head(core.cur_task) + # Set calling task's data to the event's queue so it can be removed if needed + core.cur_task.data = self.waiting + await core._never() + return True + + +# MicroPython-extension: This can be set from outside the asyncio event loop, +# such as other threads, IRQs or scheduler context. Implementation is a stream +# that asyncio will poll until a flag is set. +# Note: Unlike Event, this is self-clearing. +try: + import uio + + class ThreadSafeFlag(uio.IOBase): + def __init__(self): + self._flag = 0 + + def ioctl(self, req, flags): + if req == 3: # MP_STREAM_POLL + return self._flag * flags + return None + + def set(self): + self._flag = 1 + + async def wait(self): + if not self._flag: + yield core._io_queue.queue_read(self) + self._flag = 0 + +except ImportError: + pass diff --git a/libs/circuitpython/asyncio/funcs.py b/libs/circuitpython/asyncio/funcs.py new file mode 100644 index 0000000..b1bb24a --- /dev/null +++ b/libs/circuitpython/asyncio/funcs.py @@ -0,0 +1,165 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2022 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Functions +========= +""" + + +from . import core + + +async def _run(waiter, aw): + try: + result = await aw + status = True + except BaseException as er: + result = None + status = er + if waiter.data is None: + # The waiter is still waiting, cancel it. + if waiter.cancel(): + # Waiter was cancelled by us, change its CancelledError to an instance of + # CancelledError that contains the status and result of waiting on aw. + # If the wait_for task subsequently gets cancelled externally then this + # instance will be reset to a CancelledError instance without arguments. + waiter.data = core.CancelledError(status, result) + +async def wait_for(aw, timeout, sleep=core.sleep): + """Wait for the *aw* awaitable to complete, but cancel if it takes longer + than *timeout* seconds. If *aw* is not a task then a task will be created + from it. + + If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``: + this should be trapped by the caller. + + Returns the return value of *aw*. + + This is a coroutine. + """ + + aw = core._promote_to_task(aw) + if timeout is None: + return await aw + + # Run aw in a separate runner task that manages its exceptions. + runner_task = core.create_task(_run(core.cur_task, aw)) + + try: + # Wait for the timeout to elapse. + await sleep(timeout) + except core.CancelledError as er: + status = er.args[0] if er.args else None + if status is None: + # This wait_for was cancelled externally, so cancel aw and re-raise. + runner_task.cancel() + raise er + elif status is True: + # aw completed successfully and cancelled the sleep, so return aw's result. + return er.args[1] + else: + # aw raised an exception, propagate it out to the caller. + raise status + + # The sleep finished before aw, so cancel aw and raise TimeoutError. + runner_task.cancel() + await runner_task + raise core.TimeoutError + + +def wait_for_ms(aw, timeout): + """Similar to `wait_for` but *timeout* is an integer in milliseconds. + + This is a coroutine, and a MicroPython extension. + """ + + return wait_for(aw, timeout, core.sleep_ms) + + +class _Remove: + @staticmethod + def remove(t): + pass + + +async def gather(*aws, return_exceptions=False): + """Run all *aws* awaitables concurrently. Any *aws* that are not tasks + are promoted to tasks. + + Returns a list of return values of all *aws* + """ + if not aws: + return [] + + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push_head(gather_task) + + ts = [core._promote_to_task(aw) for aw in aws] + for i in range(len(ts)): + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + await core._never() + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state is not 0: + raise state + + # Return the list of return values of each sub-task. + return ts diff --git a/libs/circuitpython/asyncio/lock.py b/libs/circuitpython/asyncio/lock.py new file mode 100644 index 0000000..71c972f --- /dev/null +++ b/libs/circuitpython/asyncio/lock.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Locks +===== +""" + +from . import core + +# Lock class for primitive mutex capability +class Lock: + """Create a new lock which can be used to coordinate tasks. Locks start in + the unlocked state. + + In addition to the methods below, locks can be used in an ``async with`` + statement. + """ + + def __init__(self): + # The state can take the following values: + # - 0: unlocked + # - 1: locked + # - : unlocked but this task has been scheduled to acquire the lock next + self.state = 0 + # Queue of Tasks waiting to acquire this Lock + self.waiting = core.TaskQueue() + + def locked(self): + """Returns ``True`` if the lock is locked, otherwise ``False``.""" + + return self.state == 1 + + def release(self): + """Release the lock. If any tasks are waiting on the lock then the next + one in the queue is scheduled to run and the lock remains locked. Otherwise, + no tasks are waiting and the lock becomes unlocked. + """ + + if self.state != 1: + raise RuntimeError("Lock not acquired") + if self.waiting.peek(): + # Task(s) waiting on lock, schedule next Task + self.state = self.waiting.pop_head() + core._task_queue.push_head(self.state) + else: + # No Task waiting so unlock + self.state = 0 + + async def acquire(self): + """Wait for the lock to be in the unlocked state and then lock it in an + atomic way. Only one task can acquire the lock at any one time. + + This is a coroutine. + """ + + if self.state != 0: + # Lock unavailable, put the calling Task on the waiting queue + self.waiting.push_head(core.cur_task) + # Set calling task's data to the lock's queue so it can be removed if needed + core.cur_task.data = self.waiting + try: + await core._never() + except core.CancelledError as er: + if self.state == core.cur_task: + # Cancelled while pending on resume, schedule next waiting Task + self.state = 1 + self.release() + raise er + # Lock available, set it as locked + self.state = 1 + return True + + async def __aenter__(self): + return await self.acquire() + + async def __aexit__(self, exc_type, exc, tb): + return self.release() diff --git a/libs/circuitpython/asyncio/manifest.py b/libs/circuitpython/asyncio/manifest.py new file mode 100644 index 0000000..24082ff --- /dev/null +++ b/libs/circuitpython/asyncio/manifest.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: 2019 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off + +# This list of frozen files doesn't include task.py because that's provided by the C module. +freeze( + "..", + ( + "uasyncio/__init__.py", + "uasyncio/core.py", + "uasyncio/event.py", + "uasyncio/funcs.py", + "uasyncio/lock.py", + "uasyncio/stream.py", + ), + opt=3, +) diff --git a/libs/circuitpython/asyncio/stream.py b/libs/circuitpython/asyncio/stream.py new file mode 100644 index 0000000..44a8cc5 --- /dev/null +++ b/libs/circuitpython/asyncio/stream.py @@ -0,0 +1,263 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Streams +======= +""" + +from . import core + + +class Stream: + """This represents a TCP stream connection. To minimise code this class + implements both a reader and a writer, and both ``StreamReader`` and + ``StreamWriter`` alias to this class. + """ + + def __init__(self, s, e={}): + self.s = s + self.e = e + self.out_buf = b"" + + def get_extra_info(self, v): + """Get extra information about the stream, given by *v*. The valid + values for *v* are: ``peername``. + """ + + return self.e[v] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + def close(self): + pass + + async def wait_closed(self): + """Wait for the stream to close. + + This is a coroutine. + """ + + # TODO yield? + self.s.close() + + async def read(self, n): + """Read up to *n* bytes and return them. + + This is a coroutine. + """ + + await core._io_queue.queue_read(self.s) + return self.s.read(n) + + async def readinto(self, buf): + """Read up to n bytes into *buf* with n being equal to the length of *buf* + + Return the number of bytes read into *buf* + + This is a coroutine, and a MicroPython extension. + """ + + await core._io_queue.queue_read(self.s) + return self.s.readinto(buf) + + async def readexactly(self, n): + """Read exactly *n* bytes and return them as a bytes object. + + Raises an ``EOFError`` exception if the stream ends before reading + *n* bytes. + + This is a coroutine. + """ + + r = b"" + while n: + await core._io_queue.queue_read(self.s) + r2 = self.s.read(n) + if r2 is not None: + if not len(r2): + raise EOFError + r += r2 + n -= len(r2) + return r + + async def readline(self): + """Read a line and return it. + + This is a coroutine. + """ + + l = b"" + while True: + await core._io_queue.queue_read(self.s) + l2 = self.s.readline() # may do multiple reads but won't block + l += l2 + if not l2 or l[-1] == 10: # \n (check l in case l2 is str) + return l + + def write(self, buf): + """Accumulated *buf* to the output buffer. The data is only flushed when + `Stream.drain` is called. It is recommended to call `Stream.drain` + immediately after calling this function. + """ + if not self.out_buf: + # Try to write immediately to the underlying stream. + ret = self.s.write(buf) + if ret == len(buf): + return + if ret is not None: + buf = buf[ret:] + + self.out_buf += buf + + async def drain(self): + """Drain (write) all buffered output data out to the stream. + + This is a coroutine. + """ + + mv = memoryview(self.out_buf) + off = 0 + while off < len(mv): + await core._io_queue.queue_write(self.s) + ret = self.s.write(mv[off:]) + if ret is not None: + off += ret + self.out_buf = b"" + + +# Stream can be used for both reading and writing to save code size +StreamReader = Stream +StreamWriter = Stream + + +# Create a TCP stream connection to a remote host +async def open_connection(host, port): + """Open a TCP connection to the given *host* and *port*. The *host* address will + be resolved using `socket.getaddrinfo`, which is currently a blocking call. + + Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific + ``OSError`` if the host could not be resolved or if the connection could not be made. + + This is a coroutine. + """ + + from uerrno import EINPROGRESS + import usocket as socket + + ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! + s = socket.socket(ai[0], ai[1], ai[2]) + s.setblocking(False) + ss = Stream(s) + try: + s.connect(ai[-1]) + except OSError as er: + if er.errno != EINPROGRESS: + raise er + await core._io_queue.queue_write(s) + return ss, ss + + +# Class representing a TCP stream server, can be closed and used in "async with" +class Server: + """This represents the server class returned from `start_server`. It can be used in + an ``async with`` statement to close the server upon exit. + """ + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + + def close(self): + """Close the server.""" + + self.task.cancel() + + async def wait_closed(self): + """Wait for the server to close. + + This is a coroutine. + """ + + await self.task + + async def _serve(self, s, cb): + # Accept incoming connections + while True: + try: + await core._io_queue.queue_read(s) + except core.CancelledError: + # Shutdown server + s.close() + return + try: + s2, addr = s.accept() + except: + # Ignore a failed accept + continue + s2.setblocking(False) + s2s = Stream(s2, {"peername": addr}) + core.create_task(cb(s2s, s2s)) + + +# Helper function to start a TCP stream server, running as a new task +# TODO could use an accept-callback on socket read activity instead of creating a task +async def start_server(cb, host, port, backlog=5): + """Start a TCP server on the given *host* and *port*. The *cb* callback will be + called with incoming, accepted connections, and be passed 2 arguments: reader + writer streams for the connection. + + Returns a `Server` object. + + This is a coroutine. + """ + + import usocket as socket + + # Create and bind server socket. + host = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(host[-1]) + s.listen(backlog) + + # Create and return server object and task. + srv = Server() + srv.task = core.create_task(srv._serve(s, cb)) + return srv + + +################################################################################ +# Legacy uasyncio compatibility + + +async def stream_awrite(self, buf, off=0, sz=-1): + if off != 0 or sz != -1: + buf = memoryview(buf) + if sz == -1: + sz = len(buf) + buf = buf[off : off + sz] + self.write(buf) + await self.drain() + + +Stream.aclose = Stream.wait_closed +Stream.awrite = stream_awrite +Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? diff --git a/libs/circuitpython/asyncio/task.py b/libs/circuitpython/asyncio/task.py new file mode 100644 index 0000000..2e3a6db --- /dev/null +++ b/libs/circuitpython/asyncio/task.py @@ -0,0 +1,215 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George +# +# This code comes from MicroPython, and has not been run through black or pylint there. +# Altering these files significantly would make merging difficult, so we will not use +# pylint or black. +# pylint: skip-file +# fmt: off +""" +Tasks +===== +""" + +# This file contains the core TaskQueue based on a pairing heap, and the core Task class. +# They can optionally be replaced by C implementations. + +from . import core + + +# pairing-heap meld of 2 heaps; O(1) +def ph_meld(h1, h2): + if h1 is None: + return h2 + if h2 is None: + return h1 + lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 + if lt: + if h1.ph_child is None: + h1.ph_child = h2 + else: + h1.ph_child_last.ph_next = h2 + h1.ph_child_last = h2 + h2.ph_next = None + h2.ph_rightmost_parent = h1 + return h1 + else: + h1.ph_next = h2.ph_child + h2.ph_child = h1 + if h1.ph_next is None: + h2.ph_child_last = h1 + h1.ph_rightmost_parent = h2 + return h2 + + +# pairing-heap pairing operation; amortised O(log N) +def ph_pairing(child): + heap = None + while child is not None: + n1 = child + child = child.ph_next + n1.ph_next = None + if child is not None: + n2 = child + child = child.ph_next + n2.ph_next = None + n1 = ph_meld(n1, n2) + heap = ph_meld(heap, n1) + return heap + + +# pairing-heap delete of a node; stable, amortised O(log N) +def ph_delete(heap, node): + if node is heap: + child = heap.ph_child + node.ph_child = None + return ph_pairing(child) + # Find parent of node + parent = node + while parent.ph_next is not None: + parent = parent.ph_next + parent = parent.ph_rightmost_parent + # Replace node with pairing of its children + if node is parent.ph_child and node.ph_child is None: + parent.ph_child = node.ph_next + node.ph_next = None + return heap + elif node is parent.ph_child: + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + parent.ph_child = node + else: + n = parent.ph_child + while node is not n.ph_next: + n = n.ph_next + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + if node is None: + node = n + else: + n.ph_next = node + node.ph_next = next + if next is None: + node.ph_rightmost_parent = parent + parent.ph_child_last = node + return heap + + +# TaskQueue class based on the above pairing-heap functions. +class TaskQueue: + def __init__(self): + self.heap = None + + def peek(self): + return self.heap + + def push(self, v, key=None): + assert v.ph_child is None + assert v.ph_next is None + v.data = None + v.ph_key = key if key is not None else core.ticks() + self.heap = ph_meld(v, self.heap) + + def pop(self): + v = self.heap + assert v.ph_next is None + self.heap = ph_pairing(v.ph_child) + v.ph_child = None + return v + + def remove(self, v): + self.heap = ph_delete(self.heap, v) + + # Compatibility aliases, remove after they are no longer used + push_head = push + push_sorted = push + pop_head = pop + +# Task class representing a coroutine, can be waited on and cancelled. +class Task: + """This object wraps a coroutine into a running task. Tasks can be waited on + using ``await task``, which will wait for the task to complete and return the + return value of the task. + + Tasks should not be created directly, rather use ``create_task`` to create them. + """ + + def __init__(self, coro, globals=None): + self.coro = coro # Coroutine of this Task + self.data = None # General data for queue it is waiting on + self.state = True # None, False, True, a callable, or a TaskQueue instance + self.ph_key = 0 # Pairing heap + self.ph_child = None # Paring heap + self.ph_child_last = None # Paring heap + self.ph_next = None # Paring heap + self.ph_rightmost_parent = None # Paring heap + + def __iter__(self): + if not self.state: + # Task finished, signal that is has been await'ed on. + self.state = False + elif self.state is True: + # Allocated head of linked list of Tasks waiting on completion of this task. + self.state = TaskQueue() + elif type(self.state) is not TaskQueue: + # Task has state used for another purpose, so can't also wait on it. + raise RuntimeError("can't wait") + return self + + # CircuitPython needs __await()__. + __await__ = __iter__ + + def __next__(self): + if not self.state: + if self.data is None: + # Task finished but has already been sent to the loop's exception handler. + raise StopIteration + else: + # Task finished, raise return value to caller so it can continue. + raise self.data + else: + # Put calling task on waiting queue. + self.state.push(core.cur_task) + # Set calling task's data to this task that it waits on, to double-link it. + core.cur_task.data = self + + def done(self): + """Whether the task is complete.""" + + return not self.state + + def cancel(self): + """Cancel the task by injecting a ``CancelledError`` into it. The task + may or may not ignore this exception. + """ + + # Check if task is already finished. + if not self.state: + return False + # Can't cancel self (not supported yet). + if self is core.cur_task: + raise RuntimeError("can't cancel self") + # If Task waits on another task then forward the cancel to the one it's waiting on. + while isinstance(self.data, Task): + self = self.data + # Reschedule Task as a cancelled task. + if hasattr(self.data, "remove"): + # Not on the main running queue, remove the task from the queue it's on. + self.data.remove(self) + core._task_queue.push(self) + elif core.ticks_diff(self.ph_key, core.ticks()) > 0: + # On the main running queue but scheduled in the future, so bring it forward to now. + core._task_queue.remove(self) + core._task_queue.push(self) + self.data = core.CancelledError + return True diff --git a/libs/circuitpython/asyncio/traceback.py b/libs/circuitpython/asyncio/traceback.py new file mode 100644 index 0000000..eaf62ea --- /dev/null +++ b/libs/circuitpython/asyncio/traceback.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George +""" +Fallback traceback module if the system traceback is missing. +""" + +try: + from typing import List +except ImportError: + pass + +import sys + + +def _print_traceback(traceback, limit=None, file=sys.stderr) -> List[str]: + if limit is None: + if hasattr(sys, "tracebacklimit"): + limit = sys.tracebacklimit + + n = 0 + while traceback is not None: + frame = traceback.tb_frame + line_number = traceback.tb_lineno + frame_code = frame.f_code + filename = frame_code.co_filename + name = frame_code.co_name + print(' File "%s", line %d, in %s' % (filename, line_number, name), file=file) + traceback = traceback.tb_next + n = n + 1 + if limit is not None and n >= limit: + break + + +def print_exception(exception, value=None, traceback=None, limit=None, file=sys.stderr): + """ + Print exception information and stack trace to file. + """ + if traceback: + print("Traceback (most recent call last):", file=file) + _print_traceback(traceback, limit=limit, file=file) + + if isinstance(exception, BaseException): + exception_type = type(exception).__name__ + elif hasattr(exception, "__name__"): + exception_type = exception.__name__ + else: + exception_type = type(value).__name__ + + valuestr = str(value) + if value is None or not valuestr: + print(exception_type, file=file) + else: + print("%s: %s" % (str(exception_type), valuestr), file=file) diff --git a/run_tests.py b/run_tests.py index c6181a6..09a5cf1 100644 --- a/run_tests.py +++ b/run_tests.py @@ -2,7 +2,11 @@ import sys sys.path.insert(0, 'src') sys.path.insert(2, 'libs/common') -sys.path.insert(3, 'libs/micropython') +if sys.implementation.name == 'circuitpython': + sys.path.insert(3, 'libs/circuitpython') + sys.path.insert(4, 'libs/micropython') +else: + sys.path.insert(3, 'libs/micropython') import unittest