Circuitpython build

This commit is contained in:
Miguel Grinberg
2024-01-31 23:39:04 +00:00
parent bf519478cb
commit e44c271bae
13 changed files with 1527 additions and 1 deletions

View File

@@ -42,6 +42,15 @@ jobs:
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -eupy
tests-circuitpython:
name: tests-circuitpython
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -ecpy
coverage:
name: coverage
runs-on: ubuntu-latest

BIN
bin/circuitpython Executable file

Binary file not shown.

View File

@@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_ticks`
================================================================================
Work with intervals and deadlines in milliseconds
* Author(s): Jeff Epler
Implementation Notes
--------------------
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
# imports
from micropython import const
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ticks.git"
_TICKS_PERIOD = const(1 << 29)
_TICKS_MAX = const(_TICKS_PERIOD - 1)
_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2)
# Get the correct implementation of ticks_ms. There are three possibilities:
#
# - supervisor.ticks_ms is present. This will be the case starting in CP7.0
#
# - time.ticks_ms is present. This is the case for MicroPython & for the "unix
# port" of CircuitPython, used for some automated testing.
#
# - time.monotonic_ns is present, and works. This is the case on most
# Express boards in CP6.x, and most host computer versions of Python.
#
# - Otherwise, time.monotonic is assumed to be present. This is the case
# on most non-express boards in CP6.x, and some old host computer versions
# of Python.
#
# Note that on microcontrollers, this time source becomes increasingly
# inaccurate when the board has not been reset in a long time, losing the
# ability to measure 1ms intervals after about 1 hour, and losing the
# ability to meausre 128ms intervals after 6 days. The only solution is to
# either upgrade to a version with supervisor.ticks_ms, or to switch to a
# board with time.monotonic_ns.
try:
from supervisor import ticks_ms # pylint: disable=unused-import
except (ImportError, NameError):
import time
if _ticks_ms := getattr(time, "ticks_ms", None):
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return _ticks_ms() & _TICKS_MAX # pylint: disable=not-callable
else:
try:
from time import monotonic_ns as _monotonic_ns
_monotonic_ns() # Check that monotonic_ns is usable
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return (_monotonic_ns() // 1_000_000) & _TICKS_MAX
except (ImportError, NameError, NotImplementedError):
from time import monotonic as _monotonic
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return int(_monotonic() * 1000) & _TICKS_MAX
def ticks_add(ticks: int, delta: int) -> int:
"Add a delta to a base number of ticks, performing wraparound at 2**29ms."
return (ticks + delta) % _TICKS_PERIOD
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values,
assuming that they are within 2**28 ticks"""
diff = (ticks1 - ticks2) & _TICKS_MAX
diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD
return diff
def ticks_less(ticks1: int, ticks2: int) -> bool:
"""Return true if ticks1 is before ticks2 and false otherwise,
assuming that they are within 2**28 ticks"""
return ticks_diff(ticks1, ticks2) < 0

View File

@@ -0,0 +1,41 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
from .core import *
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/Adafruit/Adafruit_CircuitPython_asyncio.git"
_attrs = {
"wait_for": "funcs",
"wait_for_ms": "funcs",
"gather": "funcs",
"Event": "event",
"ThreadSafeFlag": "event",
"Lock": "lock",
"open_connection": "stream",
"start_server": "stream",
"StreamReader": "stream",
"StreamWriter": "stream",
}
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

View File

@@ -0,0 +1,430 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Core
====
"""
from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
try:
from traceback import print_exception
except:
from .traceback import print_exception
# Import TaskQueue and Task, preferring built-in C code over Python code
try:
from _asyncio import TaskQueue, Task
except ImportError:
from .task import TaskQueue, Task
################################################################################
# Exceptions
# Depending on the release of CircuitPython these errors may or may not
# exist in the C implementation of `_asyncio`. However, when they
# do exist, they must be preferred over the Python code.
try:
from _asyncio import CancelledError, InvalidStateError
except (ImportError, AttributeError):
class CancelledError(BaseException):
"""Injected into a task when calling `Task.cancel()`"""
pass
class InvalidStateError(Exception):
"""Can be raised in situations like setting a result value for a task object that already has a result value set."""
pass
class TimeoutError(Exception):
"""Raised when waiting for a task longer than the specified timeout."""
pass
# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
################################################################################
# Sleep functions
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
"""Sleep for *t* milliseconds.
This is a coroutine, and a MicroPython extension.
"""
assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
# Pause task execution for the given time (in seconds)
def sleep(t):
"""Sleep for *t* seconds
This is a coroutine.
"""
return sleep_ms(int(t * 1000))
################################################################################
# "Never schedule" object"
# Don't re-schedule the object that awaits _never().
# For internal use only. Some constructs, like `await event.wait()`,
# work by NOT re-scheduling the task which calls wait(), but by
# having some other task schedule it later.
class _NeverSingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
def _never(sgen=_NeverSingletonGenerator()):
# assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = False
return sgen
################################################################################
# Queue and poller for stream IO
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
def _enqueue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
# Link task to this IOQueue so it can be removed if needed
cur_task.data = self
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
async def queue_read(self, s):
self._enqueue(s, 0)
await _never()
async def queue_write(self, s):
self._enqueue(s, 1)
await _never()
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
################################################################################
# Main run loop
# Ensure the awaitable is a task
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
# Create and schedule a new task from a coroutine
def create_task(coro):
"""Create a new task from the given coroutine and schedule it to run.
Returns the corresponding `Task` object.
"""
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
return t
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
"""Run the given *main_task* until it completes."""
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _task_queue is ready to run
dt = 1
while dt > 0:
dt = -1
t = _task_queue.peek()
if t:
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
exc = t.data
if not exc:
t.coro.send(None)
else:
# If the task is finished and on the run queue and gets here, then it
# had an exception and was not await'ed on. Throwing into it now will
# raise StopIteration and the code below will catch this and run the
# call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
if t.state:
# Task was running but is now finished.
waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push_head(t.state.pop_head())
waiting = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Save return value of coro to pass up to caller.
t.data = er
elif t.state is None:
# Task is already finished and nothing await'ed on the task,
# so call the exception handler.
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
def run(coro):
"""Create a new task from the given coroutine and run it until it completes.
Returns the value returned by *coro*.
"""
return run_until_complete(create_task(coro))
################################################################################
# Event loop wrapper
async def _stopper():
pass
_stop_task = None
class Loop:
"""Class representing the event loop"""
_exc_handler = None
def create_task(coro):
"""Create a task from the given *coro* and return the new `Task` object."""
return create_task(coro)
def run_forever():
"""Run the event loop until `Loop.stop()` is called."""
global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(aw):
"""Run the given *awaitable* until it completes. If *awaitable* is not a task then
it will be promoted to one.
"""
return run_until_complete(_promote_to_task(aw))
def stop():
"""Stop the event loop"""
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None
def close():
"""Close the event loop."""
pass
def set_exception_handler(handler):
"""Set the exception handler to call when a Task raises an exception that is not
caught. The *handler* should accept two arguments: ``(loop, context)``
"""
Loop._exc_handler = handler
def get_exception_handler():
"""Get the current exception handler. Returns the handler, or ``None`` if no
custom handler is set.
"""
return Loop._exc_handler
def default_exception_handler(loop, context):
"""The default exception handler that is called."""
exc = context["exception"]
print_exception(None, exc, exc.__traceback__)
def call_exception_handler(context):
"""Call the current exception handler. The argument *context* is passed through
and is a dictionary containing keys:
``'message'``, ``'exception'``, ``'future'``
"""
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
"""Return the event loop used to schedule and run tasks. See `Loop`."""
return Loop
def current_task():
"""Return the `Task` object associated with the currently running task."""
return cur_task
def new_event_loop():
"""Reset the event loop and return it.
**NOTE**: Since MicroPython only has a single event loop, this function just resets
the loop's state, it does not create a new one
"""
global _task_queue, _io_queue, _exc_context, cur_task
# TaskQueue of Task instances
_task_queue = TaskQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
cur_task = None
_exc_context['exception'] = None
_exc_context['future'] = None
return Loop
# Initialise default event loop
new_event_loop()

View File

@@ -0,0 +1,92 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Events
======
"""
from . import core
# Event class for primitive events that can be waited on, set, and cleared
class Event:
"""Create a new event which can be used to synchronize tasks. Events
start in the cleared state.
"""
def __init__(self):
self.state = False # False=unset; True=set
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""
return self.state
def set(self):
"""Set the event. Any tasks waiting on the event will be scheduled to run.
"""
# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
self.state = True
def clear(self):
"""Clear the event."""
self.state = False
async def wait(self):
"""Wait for the event to be set. If the event is already set then it returns
immediately.
This is a coroutine.
"""
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
await core._never()
return True
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
if not self._flag:
yield core._io_queue.queue_read(self)
self._flag = 0
except ImportError:
pass

View File

@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2022 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Functions
=========
"""
from . import core
async def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)
async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
from it.
If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``:
this should be trapped by the caller.
Returns the return value of *aw*.
This is a coroutine.
"""
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
# Run aw in a separate runner task that manages its exceptions.
runner_task = core.create_task(_run(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
status = er.args[0] if er.args else None
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
runner_task.cancel()
await runner_task
raise core.TimeoutError
def wait_for_ms(aw, timeout):
"""Similar to `wait_for` but *timeout* is an integer in milliseconds.
This is a coroutine, and a MicroPython extension.
"""
return wait_for(aw, timeout, core.sleep_ms)
class _Remove:
@staticmethod
def remove(t):
pass
async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
Returns a list of return values of all *aws*
"""
if not aws:
return []
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done
# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False
# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
await core._never()
except core.CancelledError as er:
cancel_all = True
state = er
# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state
# Return the list of return values of each sub-task.
return ts

View File

@@ -0,0 +1,87 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Locks
=====
"""
from . import core
# Lock class for primitive mutex capability
class Lock:
"""Create a new lock which can be used to coordinate tasks. Locks start in
the unlocked state.
In addition to the methods below, locks can be used in an ``async with``
statement.
"""
def __init__(self):
# The state can take the following values:
# - 0: unlocked
# - 1: locked
# - <Task>: unlocked but this task has been scheduled to acquire the lock next
self.state = 0
# Queue of Tasks waiting to acquire this Lock
self.waiting = core.TaskQueue()
def locked(self):
"""Returns ``True`` if the lock is locked, otherwise ``False``."""
return self.state == 1
def release(self):
"""Release the lock. If any tasks are waiting on the lock then the next
one in the queue is scheduled to run and the lock remains locked. Otherwise,
no tasks are waiting and the lock becomes unlocked.
"""
if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
"""Wait for the lock to be in the unlocked state and then lock it in an
atomic way. Only one task can acquire the lock at any one time.
This is a coroutine.
"""
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
await core._never()
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 1
self.release()
raise er
# Lock available, set it as locked
self.state = 1
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()

View File

@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
),
opt=3,
)

View File

@@ -0,0 +1,263 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Streams
=======
"""
from . import core
class Stream:
"""This represents a TCP stream connection. To minimise code this class
implements both a reader and a writer, and both ``StreamReader`` and
``StreamWriter`` alias to this class.
"""
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b""
def get_extra_info(self, v):
"""Get extra information about the stream, given by *v*. The valid
values for *v* are: ``peername``.
"""
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
"""Wait for the stream to close.
This is a coroutine.
"""
# TODO yield?
self.s.close()
async def read(self, n):
"""Read up to *n* bytes and return them.
This is a coroutine.
"""
await core._io_queue.queue_read(self.s)
return self.s.read(n)
async def readinto(self, buf):
"""Read up to n bytes into *buf* with n being equal to the length of *buf*
Return the number of bytes read into *buf*
This is a coroutine, and a MicroPython extension.
"""
await core._io_queue.queue_read(self.s)
return self.s.readinto(buf)
async def readexactly(self, n):
"""Read exactly *n* bytes and return them as a bytes object.
Raises an ``EOFError`` exception if the stream ends before reading
*n* bytes.
This is a coroutine.
"""
r = b""
while n:
await core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if not len(r2):
raise EOFError
r += r2
n -= len(r2)
return r
async def readline(self):
"""Read a line and return it.
This is a coroutine.
"""
l = b""
while True:
await core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
"""Accumulated *buf* to the output buffer. The data is only flushed when
`Stream.drain` is called. It is recommended to call `Stream.drain`
immediately after calling this function.
"""
if not self.out_buf:
# Try to write immediately to the underlying stream.
ret = self.s.write(buf)
if ret == len(buf):
return
if ret is not None:
buf = buf[ret:]
self.out_buf += buf
async def drain(self):
"""Drain (write) all buffered output data out to the stream.
This is a coroutine.
"""
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
await core._io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b""
# Stream can be used for both reading and writing to save code size
StreamReader = Stream
StreamWriter = Stream
# Create a TCP stream connection to a remote host
async def open_connection(host, port):
"""Open a TCP connection to the given *host* and *port*. The *host* address will
be resolved using `socket.getaddrinfo`, which is currently a blocking call.
Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific
``OSError`` if the host could not be resolved or if the connection could not be made.
This is a coroutine.
"""
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
await core._io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
"""This represents the server class returned from `start_server`. It can be used in
an ``async with`` statement to close the server upon exit.
"""
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
"""Close the server."""
self.task.cancel()
async def wait_closed(self):
"""Wait for the server to close.
This is a coroutine.
"""
await self.task
async def _serve(self, s, cb):
# Accept incoming connections
while True:
try:
await core._io_queue.queue_read(s)
except core.CancelledError:
# Shutdown server
s.close()
return
try:
s2, addr = s.accept()
except:
# Ignore a failed accept
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
"""Start a TCP server on the given *host* and *port*. The *cb* callback will be
called with incoming, accepted connections, and be passed 2 arguments: reader
writer streams for the connection.
Returns a `Server` object.
This is a coroutine.
"""
import usocket as socket
# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.listen(backlog)
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
return srv
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off : off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?

View File

@@ -0,0 +1,215 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Tasks
=====
"""
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
from . import core
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)
def pop(self):
v = self.heap
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Compatibility aliases, remove after they are no longer used
push_head = push
push_sorted = push
pop_head = pop
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
"""This object wraps a coroutine into a running task. Tasks can be waited on
using ``await task``, which will wait for the task to complete and return the
return value of the task.
Tasks should not be created directly, rather use ``create_task`` to create them.
"""
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self
# CircuitPython needs __await()__.
__await__ = __iter__
def __next__(self):
if not self.state:
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
"""Whether the task is complete."""
return not self.state
def cancel(self):
"""Cancel the task by injecting a ``CancelledError`` into it. The task
may or may not ignore this exception.
"""
# Check if task is already finished.
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True

View File

@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
"""
Fallback traceback module if the system traceback is missing.
"""
try:
from typing import List
except ImportError:
pass
import sys
def _print_traceback(traceback, limit=None, file=sys.stderr) -> List[str]:
if limit is None:
if hasattr(sys, "tracebacklimit"):
limit = sys.tracebacklimit
n = 0
while traceback is not None:
frame = traceback.tb_frame
line_number = traceback.tb_lineno
frame_code = frame.f_code
filename = frame_code.co_filename
name = frame_code.co_name
print(' File "%s", line %d, in %s' % (filename, line_number, name), file=file)
traceback = traceback.tb_next
n = n + 1
if limit is not None and n >= limit:
break
def print_exception(exception, value=None, traceback=None, limit=None, file=sys.stderr):
"""
Print exception information and stack trace to file.
"""
if traceback:
print("Traceback (most recent call last):", file=file)
_print_traceback(traceback, limit=limit, file=file)
if isinstance(exception, BaseException):
exception_type = type(exception).__name__
elif hasattr(exception, "__name__"):
exception_type = exception.__name__
else:
exception_type = type(value).__name__
valuestr = str(value)
if value is None or not valuestr:
print(exception_type, file=file)
else:
print("%s: %s" % (str(exception_type), valuestr), file=file)

View File

@@ -2,7 +2,11 @@ import sys
sys.path.insert(0, 'src')
sys.path.insert(2, 'libs/common')
sys.path.insert(3, 'libs/micropython')
if sys.implementation.name == 'circuitpython':
sys.path.insert(3, 'libs/circuitpython')
sys.path.insert(4, 'libs/micropython')
else:
sys.path.insert(3, 'libs/micropython')
import unittest