Upgrade uasyncio release used in tests

This commit is contained in:
Miguel Grinberg
2022-12-09 17:12:57 +00:00
parent 818f98d9a4
commit 3d6815119c
7 changed files with 222 additions and 127 deletions

View File

@@ -41,7 +41,7 @@ class SingletonGenerator:
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
_task_queue.push(cur_task, self.state)
self.state = None
return None
else:
@@ -115,11 +115,11 @@ class IOQueue:
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
_task_queue.push(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
_task_queue.push(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
@@ -142,7 +142,7 @@ def create_task(coro):
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
_task_queue.push(t)
return t
@@ -167,7 +167,7 @@ def run_until_complete(main_task=None):
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
t = _task_queue.pop()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
@@ -175,6 +175,10 @@ def run_until_complete(main_task=None):
if not exc:
t.coro.send(None)
else:
# If the task is finished and on the run queue and gets here, then it
# had an exception and was not await'ed on. Throwing into it now will
# raise StopIteration and the code below will catch this and run the
# call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
@@ -185,22 +189,37 @@ def run_until_complete(main_task=None):
if isinstance(er, StopIteration):
return er.value
raise er
# Schedule any other tasks waiting on the completion of this task
if t.state:
# Task was running but is now finished.
waiting = False
if hasattr(t, "waiting"):
while t.waiting.peek():
_task_queue.push_head(t.waiting.pop_head())
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
t.waiting = None # Free waiting queue head
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push(t.state.pop())
waiting = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Indicate task is done by setting coro to the task object itself
t.coro = t
# Save return value of coro to pass up to caller
_task_queue.push(t)
# Save return value of coro to pass up to caller.
t.data = er
elif t.state is None:
# Task is already finished and nothing await'ed on the task,
# so call the exception handler.
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
@@ -237,7 +256,7 @@ class Loop:
def stop():
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
_task_queue.push(_stop_task)
# If stop() is called again, do nothing
_stop_task = None

View File

@@ -17,7 +17,7 @@ class Event:
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
core._task_queue.push(self.waiting.pop())
self.state = True
def clear(self):
@@ -26,7 +26,7 @@ class Event:
async def wait(self):
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
self.waiting.push(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
yield
@@ -36,27 +36,29 @@ class Event:
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
# Note: Unlike Event, this is self-clearing after a wait().
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
self.state = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return self.state * flags
return None
def set(self):
self._flag = 1
self.state = 1
def clear(self):
self.state = 0
async def wait(self):
if not self._flag:
if not self.state:
yield core._io_queue.queue_read(self)
self._flag = 0
self.state = 0
except ImportError:
pass

View File

@@ -1,49 +1,51 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# MIT license; Copyright (c) 2019-2022 Damien P. George
from . import core
def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)
async def wait_for(aw, timeout, sleep=core.sleep):
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()
# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
runner_task = core.create_task(_run(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
status = er.value
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
@@ -53,22 +55,75 @@ def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)
class _Remove:
@staticmethod
def remove(t):
pass
async def gather(*aws, return_exceptions=False):
if not aws:
return []
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push(gather_task)
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done
# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False
# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except Exception as er:
if return_exceptions:
ts[i] = er
yield
except core.CancelledError as er:
cancel_all = True
state = er
# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
raise er
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state
# Return the list of return values of each sub-task.
return ts

View File

@@ -22,8 +22,8 @@ class Lock:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
self.state = self.waiting.pop()
core._task_queue.push(self.state)
else:
# No Task waiting so unlock
self.state = 0
@@ -31,7 +31,7 @@ class Lock:
async def acquire(self):
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
self.waiting.push(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:

View File

@@ -1,13 +1,15 @@
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
# This list of package files doesn't include task.py because that's provided
# by the C module.
package(
"uasyncio",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
"__init__.py",
"core.py",
"event.py",
"funcs.py",
"lock.py",
"stream.py",
),
base_path="..",
opt=3,
)

View File

@@ -26,9 +26,21 @@ class Stream:
# TODO yield?
self.s.close()
async def read(self, n):
async def read(self, n=-1):
r = b""
while True:
yield core._io_queue.queue_read(self.s)
return self.s.read(n)
r2 = self.s.read(n)
if r2 is not None:
if n >= 0:
return r2
if not len(r2):
return r
r += r2
async def readinto(self, buf):
yield core._io_queue.queue_read(self.s)
return self.s.readinto(buf)
async def readexactly(self, n):
r = b""
@@ -52,9 +64,19 @@ class Stream:
return l
def write(self, buf):
if not self.out_buf:
# Try to write immediately to the underlying stream.
ret = self.s.write(buf)
if ret == len(buf):
return
if ret is not None:
buf = buf[ret:]
self.out_buf += buf
async def drain(self):
if not self.out_buf:
# Drain must always yield, so a tight loop of write+drain can't block the scheduler.
return await core.sleep_ms(0)
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
@@ -75,8 +97,8 @@ async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
@@ -103,16 +125,7 @@ class Server:
async def wait_closed(self):
await self.task
async def _serve(self, cb, host, port, backlog):
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
self.task = core.cur_task
async def _serve(self, s, cb):
# Accept incoming connections
while True:
try:
@@ -134,9 +147,20 @@ class Server:
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
s = Server()
core.create_task(s._serve(cb, host, port, backlog))
return s
import usocket as socket
# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.listen(backlog)
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
return srv
################################################################################

View File

@@ -99,19 +99,18 @@ class TaskQueue:
def peek(self):
return self.heap
def push_sorted(self, v, key):
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)
def push_head(self, v):
self.push_sorted(v, core.ticks())
def pop_head(self):
def pop(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v
def remove(self, v):
@@ -123,6 +122,7 @@ class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
@@ -130,30 +130,33 @@ class Task:
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if self.coro is self:
# Signal that the completed-task has been await'ed on.
self.waiting = None
elif not hasattr(self, "waiting"):
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
self.waiting = TaskQueue()
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self
def __next__(self):
if self.coro is self:
if not self.state:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.waiting.push_head(core.cur_task)
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
return self.coro is self
return not self.state
def cancel(self):
# Check if task is already finished.
if self.coro is self:
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
@@ -165,20 +168,10 @@ class Task:
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True
def throw(self, value):
# This task raised an exception which was uncaught; handle that now.
# Set the data because it was cleared by the main scheduling loop.
self.data = value
if not hasattr(self, "waiting"):
# Nothing await'ed on the task so call the exception handler.
core._exc_context["exception"] = value
core._exc_context["future"] = self
core.Loop.call_exception_handler(core._exc_context)