Do not silence exceptions that occur in the SSE task
This commit is contained in:
@@ -61,7 +61,12 @@ def sse_response(request, event_function, *args, **kwargs):
|
|||||||
sse = SSE()
|
sse = SSE()
|
||||||
|
|
||||||
async def sse_task_wrapper():
|
async def sse_task_wrapper():
|
||||||
|
try:
|
||||||
await event_function(request, sse, *args, **kwargs)
|
await event_function(request, sse, *args, **kwargs)
|
||||||
|
except Exception as exc:
|
||||||
|
# the SSE task raised an exception so we need to pass it to the
|
||||||
|
# main route so that it is re-raised there
|
||||||
|
sse.queue.append(exc)
|
||||||
sse.event.set()
|
sse.event.set()
|
||||||
|
|
||||||
task = asyncio.create_task(sse_task_wrapper())
|
task = asyncio.create_task(sse_task_wrapper())
|
||||||
@@ -79,7 +84,11 @@ def sse_response(request, event_function, *args, **kwargs):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
await sse.event.wait()
|
await sse.event.wait()
|
||||||
sse.event.clear()
|
sse.event.clear()
|
||||||
if event is None:
|
if isinstance(event, Exception):
|
||||||
|
# if the event is an exception we re-raise it here so that it
|
||||||
|
# can be handled appropriately
|
||||||
|
raise event
|
||||||
|
elif event is None:
|
||||||
raise StopAsyncIteration
|
raise StopAsyncIteration
|
||||||
return event
|
return event
|
||||||
|
|
||||||
|
|||||||
@@ -42,3 +42,15 @@ class TestWebSocket(unittest.TestCase):
|
|||||||
'data: [42, "foo", "bar"]\n\n'
|
'data: [42, "foo", "bar"]\n\n'
|
||||||
'data: foo\n\n'
|
'data: foo\n\n'
|
||||||
'data: foo\n\n'))
|
'data: foo\n\n'))
|
||||||
|
|
||||||
|
def test_sse_exception(self):
|
||||||
|
app = Microdot()
|
||||||
|
|
||||||
|
@app.route('/sse')
|
||||||
|
@with_sse
|
||||||
|
async def handle_sse(request, sse):
|
||||||
|
await sse.send('foo')
|
||||||
|
await sse.send(1 / 0)
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
self.assertRaises(ZeroDivisionError, self._run, client.get('/sse'))
|
||||||
|
|||||||
Reference in New Issue
Block a user