extmod/uasyncio: Fix gather cancelling and handling of exceptions.
The following fixes are made: - cancelling a gather now cancels all sub-tasks of the gather (previously it would only cancel the first) - if any sub-task of a gather raises an exception then the gather finishes (previously it would only finish if the first sub-task raised) Fixes issues #5798, #7807, #7901. Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
@@ -53,22 +53,68 @@ def wait_for_ms(aw, timeout):
|
|||||||
return wait_for(aw, timeout, core.sleep_ms)
|
return wait_for(aw, timeout, core.sleep_ms)
|
||||||
|
|
||||||
|
|
||||||
|
class _Remove:
|
||||||
|
@staticmethod
|
||||||
|
def remove(t):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def gather(*aws, return_exceptions=False):
|
async def gather(*aws, return_exceptions=False):
|
||||||
|
def done(t, er):
|
||||||
|
nonlocal state
|
||||||
|
if type(state) is not int:
|
||||||
|
# A sub-task already raised an exception, so do nothing.
|
||||||
|
return
|
||||||
|
elif not return_exceptions and not isinstance(er, StopIteration):
|
||||||
|
# A sub-task raised an exception, indicate that to the gather task.
|
||||||
|
state = er
|
||||||
|
else:
|
||||||
|
state -= 1
|
||||||
|
if state:
|
||||||
|
# Still some sub-tasks running.
|
||||||
|
return
|
||||||
|
# Gather waiting is done, schedule the main gather task.
|
||||||
|
core._task_queue.push_head(gather_task)
|
||||||
|
|
||||||
ts = [core._promote_to_task(aw) for aw in aws]
|
ts = [core._promote_to_task(aw) for aw in aws]
|
||||||
for i in range(len(ts)):
|
for i in range(len(ts)):
|
||||||
try:
|
if ts[i].state is not True:
|
||||||
# TODO handle cancel of gather itself
|
# Task is not running, gather not currently supported for this case.
|
||||||
# if ts[i].coro:
|
raise RuntimeError("can't gather")
|
||||||
# iter(ts[i]).waiting.push_head(cur_task)
|
# Register the callback to call when the task is done.
|
||||||
# try:
|
ts[i].state = done
|
||||||
# yield
|
|
||||||
# except CancelledError as er:
|
# Set the state for execution of the gather.
|
||||||
# # cancel all waiting tasks
|
gather_task = core.cur_task
|
||||||
# raise er
|
state = len(ts)
|
||||||
ts[i] = await ts[i]
|
cancel_all = False
|
||||||
except (core.CancelledError, Exception) as er:
|
|
||||||
if return_exceptions:
|
# Wait for the a sub-task to need attention.
|
||||||
ts[i] = er
|
gather_task.data = _Remove
|
||||||
else:
|
try:
|
||||||
raise er
|
yield
|
||||||
|
except core.CancelledError as er:
|
||||||
|
cancel_all = True
|
||||||
|
state = er
|
||||||
|
|
||||||
|
# Clean up tasks.
|
||||||
|
for i in range(len(ts)):
|
||||||
|
if ts[i].state is done:
|
||||||
|
# Sub-task is still running, deregister the callback and cancel if needed.
|
||||||
|
ts[i].state = True
|
||||||
|
if cancel_all:
|
||||||
|
ts[i].cancel()
|
||||||
|
elif isinstance(ts[i].data, StopIteration):
|
||||||
|
# Sub-task ran to completion, get its return value.
|
||||||
|
ts[i] = ts[i].data.value
|
||||||
|
else:
|
||||||
|
# Sub-task had an exception with return_exceptions==True, so get its exception.
|
||||||
|
ts[i] = ts[i].data
|
||||||
|
|
||||||
|
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
|
||||||
|
# return_exceptions==False, so reraise the exception here.
|
||||||
|
if state is not 0:
|
||||||
|
raise state
|
||||||
|
|
||||||
|
# Return the list of return values of each sub-task.
|
||||||
return ts
|
return ts
|
||||||
|
|||||||
@@ -27,9 +27,22 @@ async def task(id):
|
|||||||
return id
|
return id
|
||||||
|
|
||||||
|
|
||||||
async def gather_task():
|
async def task_loop(id):
|
||||||
|
print("task_loop start", id)
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
print("task_loop loop", id)
|
||||||
|
|
||||||
|
|
||||||
|
async def task_raise(id):
|
||||||
|
print("task_raise start", id)
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
raise ValueError(id)
|
||||||
|
|
||||||
|
|
||||||
|
async def gather_task(t0, t1):
|
||||||
print("gather_task")
|
print("gather_task")
|
||||||
await asyncio.gather(task(1), task(2))
|
await asyncio.gather(t0, t1)
|
||||||
print("gather_task2")
|
print("gather_task2")
|
||||||
|
|
||||||
|
|
||||||
@@ -37,19 +50,49 @@ async def main():
|
|||||||
# Simple gather with return values
|
# Simple gather with return values
|
||||||
print(await asyncio.gather(factorial("A", 2), factorial("B", 3), factorial("C", 4)))
|
print(await asyncio.gather(factorial("A", 2), factorial("B", 3), factorial("C", 4)))
|
||||||
|
|
||||||
|
print("====")
|
||||||
|
|
||||||
# Test return_exceptions, where one task is cancelled and the other finishes normally
|
# Test return_exceptions, where one task is cancelled and the other finishes normally
|
||||||
tasks = [asyncio.create_task(task(1)), asyncio.create_task(task(2))]
|
tasks = [asyncio.create_task(task(1)), asyncio.create_task(task(2))]
|
||||||
tasks[0].cancel()
|
tasks[0].cancel()
|
||||||
print(await asyncio.gather(*tasks, return_exceptions=True))
|
print(await asyncio.gather(*tasks, return_exceptions=True))
|
||||||
|
|
||||||
# Cancel a multi gather
|
print("====")
|
||||||
# TODO doesn't work, Task should not forward cancellation from gather to sub-task
|
|
||||||
# but rather CancelledError should cancel the gather directly, which will then cancel
|
# Test return_exceptions, where one task raises an exception and the other finishes normally.
|
||||||
# all sub-tasks explicitly
|
tasks = [asyncio.create_task(task(1)), asyncio.create_task(task_raise(2))]
|
||||||
# t = asyncio.create_task(gather_task())
|
print(await asyncio.gather(*tasks, return_exceptions=True))
|
||||||
# await asyncio.sleep(0.01)
|
|
||||||
# t.cancel()
|
print("====")
|
||||||
# await asyncio.sleep(0.01)
|
|
||||||
|
# Test case where one task raises an exception and other task keeps running.
|
||||||
|
tasks = [asyncio.create_task(task_loop(1)), asyncio.create_task(task_raise(2))]
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
except ValueError as er:
|
||||||
|
print(repr(er))
|
||||||
|
print(tasks[0].done(), tasks[1].done())
|
||||||
|
for t in tasks:
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.sleep(0.04)
|
||||||
|
|
||||||
|
print("====")
|
||||||
|
|
||||||
|
# Test case where both tasks raise an exception.
|
||||||
|
tasks = [asyncio.create_task(task_raise(1)), asyncio.create_task(task_raise(2))]
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
except ValueError as er:
|
||||||
|
print(repr(er))
|
||||||
|
print(tasks[0].done(), tasks[1].done())
|
||||||
|
|
||||||
|
print("====")
|
||||||
|
|
||||||
|
# Cancel a multi gather.
|
||||||
|
t = asyncio.create_task(gather_task(task(1), task(2)))
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.sleep(0.04)
|
||||||
|
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|||||||
@@ -8,6 +8,27 @@ Task B: factorial(3) = 6
|
|||||||
Task C: Compute factorial(4)...
|
Task C: Compute factorial(4)...
|
||||||
Task C: factorial(4) = 24
|
Task C: factorial(4) = 24
|
||||||
[2, 6, 24]
|
[2, 6, 24]
|
||||||
|
====
|
||||||
start 2
|
start 2
|
||||||
end 2
|
end 2
|
||||||
[CancelledError(), 2]
|
[CancelledError(), 2]
|
||||||
|
====
|
||||||
|
start 1
|
||||||
|
task_raise start 2
|
||||||
|
end 1
|
||||||
|
[1, ValueError(2,)]
|
||||||
|
====
|
||||||
|
task_loop start 1
|
||||||
|
task_raise start 2
|
||||||
|
task_loop loop 1
|
||||||
|
ValueError(2,)
|
||||||
|
False True
|
||||||
|
====
|
||||||
|
task_raise start 1
|
||||||
|
task_raise start 2
|
||||||
|
ValueError(1,)
|
||||||
|
True True
|
||||||
|
====
|
||||||
|
gather_task
|
||||||
|
start 1
|
||||||
|
start 2
|
||||||
|
|||||||
53
tests/extmod/uasyncio_gather_notimpl.py
Normal file
53
tests/extmod/uasyncio_gather_notimpl.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
# Test uasyncio.gather() function, features that are not implemented.
|
||||||
|
|
||||||
|
try:
|
||||||
|
import uasyncio as asyncio
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
except ImportError:
|
||||||
|
print("SKIP")
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
|
||||||
|
def custom_handler(loop, context):
|
||||||
|
print(repr(context["exception"]))
|
||||||
|
|
||||||
|
|
||||||
|
async def task(id):
|
||||||
|
print("task start", id)
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
print("task end", id)
|
||||||
|
return id
|
||||||
|
|
||||||
|
|
||||||
|
async def gather_task(t0, t1):
|
||||||
|
print("gather_task start")
|
||||||
|
await asyncio.gather(t0, t1)
|
||||||
|
print("gather_task end")
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.set_exception_handler(custom_handler)
|
||||||
|
|
||||||
|
# Test case where can't wait on a task being gathered.
|
||||||
|
tasks = [asyncio.create_task(task(1)), asyncio.create_task(task(2))]
|
||||||
|
gt = asyncio.create_task(gather_task(tasks[0], tasks[1]))
|
||||||
|
await asyncio.sleep(0) # let the gather start
|
||||||
|
try:
|
||||||
|
await tasks[0] # can't await because this task is part of the gather
|
||||||
|
except RuntimeError as er:
|
||||||
|
print(repr(er))
|
||||||
|
await gt
|
||||||
|
|
||||||
|
print("====")
|
||||||
|
|
||||||
|
# Test case where can't gather on a task being waited.
|
||||||
|
tasks = [asyncio.create_task(task(1)), asyncio.create_task(task(2))]
|
||||||
|
asyncio.create_task(gather_task(tasks[0], tasks[1]))
|
||||||
|
await tasks[0] # wait on this task before the gather starts
|
||||||
|
await tasks[1]
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
14
tests/extmod/uasyncio_gather_notimpl.py.exp
Normal file
14
tests/extmod/uasyncio_gather_notimpl.py.exp
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
task start 1
|
||||||
|
task start 2
|
||||||
|
gather_task start
|
||||||
|
RuntimeError("can't wait",)
|
||||||
|
task end 1
|
||||||
|
task end 2
|
||||||
|
gather_task end
|
||||||
|
====
|
||||||
|
task start 1
|
||||||
|
task start 2
|
||||||
|
gather_task start
|
||||||
|
RuntimeError("can't gather",)
|
||||||
|
task end 1
|
||||||
|
task end 2
|
||||||
Reference in New Issue
Block a user