31 Commits

Author SHA1 Message Date
Miguel Grinberg
46b120bc87 Release 1.2.2 2023-03-03 07:19:47 +00:00
Miguel Grinberg
ddb3b8f442 Return headers as lowercase byte sequences as required by ASGI 2023-03-03 07:15:09 +00:00
Miguel Grinberg
9398c96075 Add CPU timing to benchmark 2023-02-28 23:30:58 +00:00
Miguel Grinberg
4d432a7d6c More robust timeout handling (Fixes #106) 2023-02-28 18:31:54 +00:00
Miguel Grinberg
d0d358f94a Add a socket read timeout to abort incomplete requests (Fixes #99) 2023-02-22 00:42:20 +00:00
Miguel Grinberg
680cd9c023 Async example of static file serving 2023-02-21 15:18:04 +00:00
dependabot[bot]
ec72d54203 Bump werkzeug from 2.2.1 to 2.2.3 in /examples/benchmark (#102) #nolog
Bumps [werkzeug](https://github.com/pallets/werkzeug) from 2.2.1 to 2.2.3.
- [Release notes](https://github.com/pallets/werkzeug/releases)
- [Changelog](https://github.com/pallets/werkzeug/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/werkzeug/compare/2.2.1...2.2.3)

---
updated-dependencies:
- dependency-name: werkzeug
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-02-16 10:02:41 +00:00
Eric Welch
c00b24c943 Fixing broken links to examples in documentation (#101) 2023-02-15 16:07:17 +00:00
dependabot[bot]
878a911afc Bump starlette from 0.19.1 to 0.25.0 in /examples/benchmark (#100) 2023-02-14 23:27:49 +00:00
Miguel Grinberg
ecd84ecb7b Update unittest library for MicroPython 2023-02-07 00:03:53 +00:00
Miguel Grinberg
fcaeee6905 Add @after_error_handler decorator (Fixes #97) 2023-02-06 23:53:11 +00:00
Miguel Grinberg
427a4d49de Correct path in benchmark test #nolog 2023-01-13 14:58:16 +00:00
Miguel Grinberg
f56c826149 Update tox.ini #nolog 2023-01-13 11:39:23 +00:00
Miguel Grinberg
2aa90d4245 Add scrollbar to documentation's left sidebar 2023-01-13 10:27:51 +00:00
William Wheeler
8139498023 Documentation typo (#90) 2022-12-20 07:24:49 +00:00
Miguel Grinberg
3d6815119c Upgrade uasyncio release used in tests 2022-12-09 17:12:57 +00:00
Miguel Grinberg
818f98d9a4 New build of micropython 2022-12-09 12:15:35 +00:00
Miguel Grinberg
dd15d90239 Remove 3.6, add 3.11 2022-12-09 11:20:07 +00:00
dependabot[bot]
d42388d6fe Bump certifi from 2022.6.15 to 2022.12.7 in /examples/benchmark (#88) #nolog
Bumps [certifi](https://github.com/certifi/python-certifi) from 2022.6.15 to 2022.12.7.
- [Release notes](https://github.com/certifi/python-certifi/releases)
- [Commits](https://github.com/certifi/python-certifi/compare/2022.06.15...2022.12.07)

---
updated-dependencies:
- dependency-name: certifi
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-12-09 11:07:23 +00:00
Miguel Grinberg
1abe8edc56 Version 1.2.2.dev0 2022-12-06 12:38:26 +00:00
Miguel Grinberg
e69c2dc42f Release 1.2.1 2022-12-06 12:37:51 +00:00
Miguel Grinberg
5a589afd5e Addressed error when deleting a user session in async app (Fixes #86) 2022-12-06 12:01:16 +00:00
Miguel Grinberg
c841cbedda Add asyncio file upload example 2022-11-16 19:10:44 +00:00
Diego Pomares
24d74fb848 Error handling invokes parent exceptions (Fixes #74) 2022-11-08 00:27:11 +00:00
Diego Pomares
4a9b92b800 Fix typos in documentation (#77) 2022-10-21 10:35:22 +01:00
Diego Pomares
c443599089 Add missing exception argument to error handler example in documentation (#73) 2022-10-15 12:44:52 +01:00
Miguel Grinberg
6554f29ddc Remove unused file #nolog 2022-10-08 12:26:18 +01:00
Miguel Grinberg
211ad953ae New Jinja and uTemplate examples with Bootstrap 2022-10-08 12:21:00 +01:00
Miguel Grinberg
63f43e1e7e Version 1.2.1.dev0 2022-09-25 12:19:46 +01:00
Miguel Grinberg
cb2a23285e Release 1.2.0 2022-09-25 12:19:31 +01:00
Miguel Grinberg
b133dcc343 URL encode/decode unit tests 2022-09-24 20:15:22 +01:00
52 changed files with 1297 additions and 770 deletions

View File

@@ -21,7 +21,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python: ['3.6', '3.7', '3.8', '3.9', '3.10']
python: ['3.7', '3.8', '3.9', '3.10', '3.11']
fail-fast: false
runs-on: ${{ matrix.os }}
steps:

View File

@@ -1,5 +1,40 @@
# Microdot change log
**Release 1.2.2** - 2023-03-03
- Add a socket read timeout to abort incomplete requests [#99](https://github.com/miguelgrinberg/microdot/issues/99) ([commit](https://github.com/miguelgrinberg/microdot/commit/d0d358f94a63f8565d6406feff0c6e7418cc7f81))
- More robust timeout handling [#106](https://github.com/miguelgrinberg/microdot/issues/106) ([commit](https://github.com/miguelgrinberg/microdot/commit/4d432a7d6cd88b874a8b825fb62891ed22881f74))
- Add @after_error_handler decorator [#97](https://github.com/miguelgrinberg/microdot/issues/97) ([commit](https://github.com/miguelgrinberg/microdot/commit/fcaeee69052b5681706f65b022e667baeee30d4d))
- Return headers as lowercase byte sequences as required by ASGI ([commit](https://github.com/miguelgrinberg/microdot/commit/ddb3b8f442d3683df04554104edaf8acd9c68148))
- Async example of static file serving ([commit](https://github.com/miguelgrinberg/microdot/commit/680cd9c023352f0ff03d67f1041ea174b7b7385b))
- Fixing broken links to examples in documentation [#101](https://github.com/miguelgrinberg/microdot/issues/101) ([commit](https://github.com/miguelgrinberg/microdot/commit/c00b24c9436e1b8f3d4c9bb6f2adfca988902e91)) (thanks **Eric Welch**!)
- Add scrollbar to documentation's left sidebar ([commit](https://github.com/miguelgrinberg/microdot/commit/2aa90d42451dc64c84efcc4f40a1b6c8d1ef1e8d))
- Documentation typo [#90](https://github.com/miguelgrinberg/microdot/issues/90) ([commit](https://github.com/miguelgrinberg/microdot/commit/81394980234f24aac834faf8e2e8225231e9014b)) (thanks **William Wheeler**!)
- Add CPU timing to benchmark ([commit](https://github.com/miguelgrinberg/microdot/commit/9398c960752f87bc32d7c4349cbf594e5d678e99))
- Upgrade uasyncio release used in tests ([commit](https://github.com/miguelgrinberg/microdot/commit/3d6815119ca1ec989f704f626530f938c857a8e5))
- Update unittest library for MicroPython ([commit](https://github.com/miguelgrinberg/microdot/commit/ecd84ecb7bd3c29d5af96739442b908badeab804))
- New build of micropython for unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/818f98d9a4e531e01c0f913813425ab2b40c289d))
- Remove 3.6, add 3.11 to builds ([commit](https://github.com/miguelgrinberg/microdot/commit/dd15d90239b73b5fd413515c9cd4ac23f6d42f67))
**Release 1.2.1** - 2022-12-06
- Error handling invokes parent exceptions [#74](https://github.com/miguelgrinberg/microdot/issues/74) ([commit](https://github.com/miguelgrinberg/microdot/commit/24d74fb8483b04e8abe6e303e06f0a310f32700b)) (thanks **Diego Pomares**!)
- Addressed error when deleting a user session in async app [#86](https://github.com/miguelgrinberg/microdot/issues/86) ([commit](https://github.com/miguelgrinberg/microdot/commit/5a589afd5e519e94e84fc1ee69033f2dad51c3ea))
- Add asyncio file upload example ([commit](https://github.com/miguelgrinberg/microdot/commit/c841cbedda40f59a9d87f6895fdf9fd954f854a2))
- New Jinja and uTemplate examples with Bootstrap ([commit](https://github.com/miguelgrinberg/microdot/commit/211ad953aeedb4c7f73fe210424aa173b4dc7fee))
- Fix typos in documentation [#77](https://github.com/miguelgrinberg/microdot/issues/77) ([commit](https://github.com/miguelgrinberg/microdot/commit/4a9b92b800d3fd87110f7bc9f546c10185ee13bc)) (thanks **Diego Pomares**!)
- Add missing exception argument to error handler example in documentation [#73](https://github.com/miguelgrinberg/microdot/issues/73) ([commit](https://github.com/miguelgrinberg/microdot/commit/c443599089f2127d1cb052dfba8a05c1969d65e3)) (thanks **Diego Pomares**!)
**Release 1.2.0** - 2022-09-25
- Use a case insensitive dict for headers ([commit #1](https://github.com/miguelgrinberg/microdot/commit/b0fd6c432371ca5cb10d07ff84c4deed7aa0ce2e) [commit #2](https://github.com/miguelgrinberg/microdot/commit/a8515c97b030f942fa6ca85cbe1772291468fb0d))
- urlencode() helper function ([commit #1](https://github.com/miguelgrinberg/microdot/commit/672512e086384e808489305502e6ebebcc5a888f) [commit #2](https://github.com/miguelgrinberg/microdot/commit/b133dcc34368853ee685396a1bcb50360e807813))
- Added `request.url` attribute with the complete URL of the request ([commit](https://github.com/miguelgrinberg/microdot/commit/1547e861ee28d43d10fe4c4ed1871345d4b81086))
- Do not log HTTPException occurrences ([commit](https://github.com/miguelgrinberg/microdot/commit/cbefb6bf3a3fdcff8b7a8bacad3449be18e46e3b))
- Cache user session for performance ([commit](https://github.com/miguelgrinberg/microdot/commit/01947b101ebe198312c88d73872e3248024918f0))
- File upload example ([commit](https://github.com/miguelgrinberg/microdot/commit/8ebe81c09b604ddc1123e78ad6bc87ceda5f8597))
- Minor documentation styling fixes ([commit](https://github.com/miguelgrinberg/microdot/commit/4f263c63ab7bb1ce0dd48d8e00f3c6891e1bf07e))
**Release 1.1.1** - 2022-09-18
- Make WebSocket internals consistent between TLS and non-TLS [#61](https://github.com/miguelgrinberg/microdot/issues/61) ([commit](https://github.com/miguelgrinberg/microdot/commit/5693b812ceb2c0d51ec3c991adf6894a87e6fcc7))

Binary file not shown.

View File

@@ -1,3 +1,8 @@
.py.class, .py.function, .py.method, .py.property {
margin-top: 20px;
}
div.sphinxsidebar {
max-height: 100%;
overflow-y: auto;
}

View File

@@ -23,7 +23,7 @@ Asynchronous Support with Asyncio
| MicroPython: `uasyncio <https://github.com/micropython/micropython/tree/master/extmod/uasyncio>`_
* - Examples
- | `hello_async.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_async.py>`_
- | `hello_async.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_async.py>`_
Microdot can be extended to use an asynchronous programming model based on the
``asyncio`` package. When the :class:`Microdot <microdot_asyncio.Microdot>`
@@ -68,8 +68,8 @@ Using the uTemplate Engine
- | `utemplate <https://github.com/pfalcon/utemplate/tree/master/utemplate>`_
* - Examples
- | `hello_utemplate.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_utemplate.py>`_
| `hello_utemplate_async.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_utemplate_async.py>`_
- | `hello.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/templates/utemplate/hello.py>`_
| `hello_utemplate_async.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_utemplate_async.py>`_
The :func:`render_template <microdot_utemplate.render_template>` function is
used to render HTML templates with the uTemplate engine. The first argument is
@@ -110,7 +110,7 @@ Using the Jinja Engine
- | `Jinja2 <https://jinja.palletsprojects.com/>`_
* - Examples
- | `hello_jinja.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_jinja.py>`_
- | `hello.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/templates/jinja/hello.py>`_
The :func:`render_template <microdot_jinja.render_template>` function is used
to render HTML templates with the Jinja engine. The first argument is the
@@ -156,7 +156,7 @@ Maintaing Secure User Sessions
`hmac <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
* - Examples
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login.py>`_
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/sessions/login.py>`_
The session extension provides a secure way for the application to maintain
user sessions. The session is stored as a signed cookie in the client's
@@ -244,7 +244,7 @@ Example::
ws.send(message)
.. note::
An unsupported *microsoft_websocket_alt.py* module, with the same
An unsupported *microdot_websocket_alt.py* module, with the same
interface, is also provided. This module uses the native WebSocket support
in MicroPython that powers the WebREPL, and may provide slightly better
performance for MicroPython low-end boards. This module is not compatible
@@ -276,9 +276,9 @@ This extension has the same interface as the synchronous WebSocket extension,
but the ``receive()`` and ``send()`` methods are asynchronous.
.. note::
An unsupported *microsoft_asgi_websocket.py* module, with the same
An unsupported *microdot_asgi_websocket.py* module, with the same
interface, is also provided. This module must be used instead of
*microsoft_asyncio_websocket.py* when the ASGI support is used. The
*microdot_asyncio_websocket.py* when the ASGI support is used. The
`echo_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/websocket/echo_asgi.py>`_
example shows how to use this module.
@@ -297,7 +297,7 @@ HTTPS Support
* - Examples
- | `hello_tls.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/tls/hello_tls.py>`_
| `hello_asyncio_tls.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/tls/hello_asyncio_tls.py>`_
| `hello_async_tls.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/tls/hello_async_tls.py>`_
The ``run()`` function accepts an optional ``ssl`` argument, through which an
initialized ``SSLContext`` object can be passed. MicroPython does not currently
@@ -423,7 +423,7 @@ Using a WSGI Web Server
- | A WSGI web server, such as `Gunicorn <https://gunicorn.org/>`_.
* - Examples
- | `hello_wsgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_wsgi.py>`_
- | `hello_wsgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_wsgi.py>`_
The ``microdot_wsgi`` module provides an extended ``Microdot`` class that
@@ -468,7 +468,7 @@ Using an ASGI Web Server
- | An ASGI web server, such as `Uvicorn <https://uvicorn.org/>`_.
* - Examples
- | `hello_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello_asgi.py>`_
- | `hello_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_asgi.py>`_
The ``microdot_asgi`` module provides an extended ``Microdot`` class that
implements the ASGI protocol and can be used with a compliant ASGI server such

View File

@@ -283,7 +283,7 @@ handled::
def start_timer(request):
request.g.start_time = time.time()
@ap.after_request
@app.after_request
def end_timer(request, response):
duration = time.time() - request.g.start_time
print(f'Request took {duration:0.2f} seconds')
@@ -293,6 +293,12 @@ The function can return a modified response object to replace the original. If
the function does not return a value, then the original response object is
used.
The after request handlers are only invoked for successful requests. The
:func:`after_error_request() <microdot.Microdot.after_error_request>`
decorator can be used to register a function that is called after an error
occurs. The function receives the request and the error response and is
expected to return an updated response object.
.. note::
The :ref:`request.g <The "g" Object>` object is a special object that allows
the before and after request handlers, as well sa the route function to
@@ -314,7 +320,7 @@ automatically handled by Microdot are:
While the above errors are fully complaint with the HTTP specification, the
application might want to provide custom responses for them. The
:func:`errorhandler() <microdot.Microdot.errorhandler>` decorator registers
a functions to respond to specific error codes. The following example shows a
functions to respond to specific error codes. The following example shows a
custom error handler for 404 errors::
@app.errorhandler(404)
@@ -322,14 +328,18 @@ custom error handler for 404 errors::
return {'error': 'resource not found'}, 404
The ``errorhandler()`` decorator has a second form, in which it takes an
exception class as an argument. Microdot will then invoke the handler when an
exception of that class is raised. The next example provides a custom response
for division by zero errors::
exception class as an argument. Microdot will then invoke the handler when the
exception is an instance of the given class is raised. The next example
provides a custom response for division by zero errors::
@app.errorhandler(ZeroDivisionError)
def division_by_zero(request):
def division_by_zero(request, exception):
return {'error': 'division by zero'}, 500
When the raised exception class does not have an error handler defined, but
one or more of its base classes do, Microdot makes an attempt to invoke the
most specific handler.
Mounting a Sub-Application
^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -481,7 +491,7 @@ Accessing the Raw Request Body
For cases in which neither JSON nor form data is expected, the
:attr:`body <microdot.Request.body>` request attribute returns the entire body
of the request as a byte sequence.
of the request as a byte sequence.
If the expected body is too large to fit in memory, the application can use the
:attr:`stream <microdot.Request.stream>` request attribute to read the body
@@ -645,9 +655,9 @@ File Responses
The :func:`send_file <microdot.Response.send_file>` function builds a response
object for a file::
from microdot import send_file
@app.get('/')
def index(request):
return send_file('/static/index.html')

View File

@@ -1,27 +0,0 @@
from microdot import Microdot
from microdot_auth import BasicAuth
app = Microdot()
basic_auth = BasicAuth()
USERS = {
'susan': 'hello',
'david': 'bye',
}
@basic_auth.callback
def verify_password(request, username, password):
if username in USERS and USERS[username] == password:
request.g.user = username
return True
@app.route('/')
@basic_auth
def index(request):
return f'Hello, {request.g.user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,60 +0,0 @@
from microdot import Microdot, redirect
from microdot_session import set_session_secret_key
from microdot_login import LoginAuth
app = Microdot()
set_session_secret_key('top-secret')
login_auth = LoginAuth()
USERS = {
'susan': 'hello',
'david': 'bye',
}
@login_auth.callback
def check_user(request, user_id):
request.g.user = user_id
return True
@app.route('/')
@login_auth
def index(request):
return f'''
<h1>Login Auth Example</h1>
<p>Hello, {request.g.user}!</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
''', {'Content-Type': 'text/html'}
@app.route('/login', methods=['GET', 'POST'])
def login(request):
if request.method == 'GET':
return '''
<h1>Login Auth Example</h1>
<form method="POST">
<input name="username" placeholder="username">
<input name="password" type="password" placeholder="password">
<button type="submit">Login</button>
</form>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
if USERS.get(username) == password:
login_auth.login_user(request, username)
return login_auth.redirect_to_next(request)
else:
return redirect('/login')
@app.post('/logout')
def logout(request):
login_auth.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,27 +0,0 @@
from microdot import Microdot
from microdot_auth import TokenAuth
app = Microdot()
token_auth = TokenAuth()
TOKENS = {
'hello': 'susan',
'bye': 'david',
}
@token_auth.callback
def verify_token(request, token):
if token in TOKENS:
request.g.user = TOKENS[token]
return True
@app.route('/')
@token_auth
def index(request):
return f'Hello, {request.g.user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,7 +1,7 @@
aiofiles==0.8.0
anyio==3.6.1
blinker==1.5
certifi==2022.6.15
certifi==2022.12.7
charset-normalizer==2.1.0
click==8.1.3
fastapi==0.79.0
@@ -24,10 +24,10 @@ pydantic==1.9.1
quart==0.18.0
requests==2.28.1
sniffio==1.2.0
starlette==0.19.1
starlette==0.25.0
toml==0.10.2
typing_extensions==4.3.0
urllib3==1.26.11
uvicorn==0.18.2
Werkzeug==2.2.1
Werkzeug==2.2.3
wsproto==1.1.0

View File

@@ -1,14 +0,0 @@
curl -X GET http://localhost:5000/ <-- microdot
{"ram": 8429568}%
curl -X GET http://localhost:5000/ <-- microdot_asyncio
{"ram": 12410880}%
curl -X GET http://localhost:8000/ <-- microdot_wsgi
{"ram": 9101312}%
curl -X GET http://localhost:8000/ <-- microdot_asgi
{"ram": 18620416}%
curl -X GET http://localhost:5000/ <-- flask app.run
{"ram":25460736}
curl -X GET http://localhost:5000/ <-- flask run
{"ram":26210304}
curl -X GET http://localhost:5000/ <-- quart run
{"ram":31748096}%

View File

@@ -1,6 +1,7 @@
import os
import subprocess
import time
from timeit import timeit
import requests
import psutil
import humanize
@@ -76,19 +77,23 @@ apps = [
for app, env, name in apps:
p = subprocess.Popen(
app.split() if isinstance(app, str) else app,
env={'PATH': os.environ['PATH'], **env},
env={'PATH': os.environ['PATH'] + ':../../bin', **env},
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
time.sleep(1)
tm = 0
if not name.startswith('baseline'):
r = requests.get('http://localhost:5000')
r.raise_for_status()
def req():
r = requests.get('http://localhost:5000')
r.raise_for_status()
tm = timeit(req, number=1000)
proc = psutil.Process(p.pid)
mem = proc.memory_info().rss
for child in proc.children(recursive=True):
mem += child.memory_info().rss
bar = '*' * (mem // (1024 * 1024))
print(f'{name:<28}{humanize.naturalsize(mem):>10} {bar}')
print(f'{name:<28}{tm:10.2f}s {humanize.naturalsize(mem):>10} {bar}')
p.terminate()
time.sleep(1)

View File

@@ -0,0 +1,19 @@
from microdot_asyncio import Microdot
from microdot import send_file
app = Microdot()
@app.route('/')
async def index(request):
return send_file('static/index.html')
@app.route('/static/<path:path>')
async def static(request, path):
if '..' in path:
# directory traversal is not allowed
return 'Not found', 404
return send_file('static/' + path)
app.run(debug=True)

View File

@@ -0,0 +1,19 @@
from microdot import Microdot, Response
from microdot_jinja import render_template
app = Microdot()
Response.default_content_type = 'text/html'
@app.route('/')
def index(req):
return render_template('page1.html', page='Page 1')
@app.route('/page2')
def page2(req):
return render_template('page2.html', page='Page 2')
if __name__ == '__main__':
app.run()

View File

@@ -10,7 +10,7 @@ def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return render_template('index_jinja.html', name=name)
return render_template('index.html', name=name)
if __name__ == '__main__':

View File

@@ -0,0 +1,48 @@
<!--
This is based on the Bootstrap 5 starter template from the documentation:
https://getbootstrap.com/docs/5.0/getting-started/introduction/#starter-template
-->
<!doctype html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<title>Microdot + Jinja + Bootstrap</title>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a class="navbar-brand" href="/">Microdot + Jinja + Bootstrap</a>
</div>
</nav>
<br>
<div class="container">
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;">
<symbol id="info-fill" fill="currentColor" viewBox="0 0 16 16">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</symbol>
</svg>
<div class="alert alert-primary d-flex align-items-center" role="alert">
<svg class="bi flex-shrink-0 me-2" width="24" height="24" role="img" aria-label="Info:"><use xlink:href="#info-fill"/></svg>
<div>This example demonstrates how to create an application that uses <a href="https://getbootstrap.com" class="alert-link">Bootstrap</a> styling. The page layout is defined in a base template that is inherited by several pages.</div>
</div>
{% block content %}{% endblock %}
</div>
<!-- Optional JavaScript; choose one of the two! -->
<!-- Option 1: Bootstrap Bundle with Popper -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM" crossorigin="anonymous"></script>
<!-- Option 2: Separate Popper and Bootstrap JS -->
<!--
<script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.9.2/dist/umd/popper.min.js" integrity="sha384-IQsoLXl5PILFhosVNubq5LC7Qb9DXgDA9i+tQ8Zj3iwWAwPtgFTxbJ8NT4GN1R8p" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.min.js" integrity="sha384-cVKIPhGWiC2Al4u+LWgxfKTRIcfu0JTxR+EQDz/bgldoEyl4H0zUF0QKbrJ0EcQF" crossorigin="anonymous"></script>
-->
</body>
</html>

View File

@@ -0,0 +1,6 @@
{% extends 'base.html' %}
{% block content %}
<h2>This is {{ page }}</h2>
<p>Go to <a href="/page2">Page 2</a>.</p>
{% endblock %}

View File

@@ -0,0 +1,6 @@
{% extends 'base.html' %}
{% block content %}
<h2>This is {{ page }}</h2>
<p>Go back <a href="/">Page 1</a>.</p>
{% endblock %}

View File

@@ -0,0 +1,19 @@
from microdot import Microdot, Response
from microdot_utemplate import render_template
app = Microdot()
Response.default_content_type = 'text/html'
@app.route('/')
def index(req):
return render_template('page1.html', page='Page 1')
@app.route('/page2')
def page2(req):
return render_template('page2.html', page='Page 2')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -10,7 +10,7 @@ def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return render_template('index_utemplate.html', name=name)
return render_template('index.html', name=name)
if __name__ == '__main__':

View File

@@ -0,0 +1,14 @@
</div>
<!-- Optional JavaScript; choose one of the two! -->
<!-- Option 1: Bootstrap Bundle with Popper -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM" crossorigin="anonymous"></script>
<!-- Option 2: Separate Popper and Bootstrap JS -->
<!--
<script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.9.2/dist/umd/popper.min.js" integrity="sha384-IQsoLXl5PILFhosVNubq5LC7Qb9DXgDA9i+tQ8Zj3iwWAwPtgFTxbJ8NT4GN1R8p" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.min.js" integrity="sha384-cVKIPhGWiC2Al4u+LWgxfKTRIcfu0JTxR+EQDz/bgldoEyl4H0zUF0QKbrJ0EcQF" crossorigin="anonymous"></script>
-->
</body>
</html>

View File

@@ -0,0 +1,33 @@
<!--
This is based on the Bootstrap 5 starter template from the documentation:
https://getbootstrap.com/docs/5.0/getting-started/introduction/#starter-template
-->
<!doctype html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<title>Microdot + uTemplate + Bootstrap</title>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a class="navbar-brand" href="/">Microdot + uTemplate + Bootstrap</a>
</div>
</nav>
<br>
<div class="container">
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;">
<symbol id="info-fill" fill="currentColor" viewBox="0 0 16 16">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</symbol>
</svg>
<div class="alert alert-primary d-flex align-items-center" role="alert">
<svg class="bi flex-shrink-0 me-2" width="24" height="24" role="img" aria-label="Info:"><use xlink:href="#info-fill"/></svg>
<div>This example demonstrates how to create an application that uses <a href="https://getbootstrap.com" class="alert-link">Bootstrap</a> styling. The page layout is defined in a base template that is inherited by several pages.</div>
</div>

View File

@@ -0,0 +1,7 @@
{% args page %}
{% include 'base_header.html' %}
<h2>This is {{ page }}</h2>
<p>Go to <a href="/page2">Page 2</a>.</p>
{% include 'base_footer.html' %}

View File

@@ -0,0 +1,7 @@
{% args page %}
{% include 'base_header.html' %}
<h2>This is {{ page }}</h2>
<p>Go back <a href="/">Page 1</a>.</p>
{% include 'base_footer.html' %}

View File

@@ -1,6 +1,7 @@
from microdot import Microdot, send_file
from microdot import Microdot, send_file, Request
app = Microdot()
Request.max_content_length = 1024 * 1024 # 1MB (change as needed)
@app.get('/')
@@ -30,4 +31,4 @@ def upload(request):
if __name__ == '__main__':
app.run()
app.run(debug=True)

View File

@@ -0,0 +1,34 @@
from microdot_asyncio import Microdot, send_file, Request
app = Microdot()
Request.max_content_length = 1024 * 1024 # 1MB (change as needed)
@app.get('/')
async def index(request):
return send_file('index.html')
@app.post('/upload')
async def upload(request):
# obtain the filename and size from request headers
filename = request.headers['Content-Disposition'].split(
'filename=')[1].strip('"')
size = int(request.headers['Content-Length'])
# sanitize the filename
filename = filename.replace('/', '_')
# write the file to the files directory in 1K chunks
with open('files/' + filename, 'wb') as f:
while size > 0:
chunk = await request.stream.read(min(size, 1024))
f.write(chunk)
size -= len(chunk)
print('Successfully saved file: ' + filename)
return ''
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -41,7 +41,7 @@ class SingletonGenerator:
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
_task_queue.push(cur_task, self.state)
self.state = None
return None
else:
@@ -115,11 +115,11 @@ class IOQueue:
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
_task_queue.push(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
_task_queue.push(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
@@ -142,7 +142,7 @@ def create_task(coro):
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
_task_queue.push(t)
return t
@@ -167,7 +167,7 @@ def run_until_complete(main_task=None):
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
t = _task_queue.pop()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
@@ -175,6 +175,10 @@ def run_until_complete(main_task=None):
if not exc:
t.coro.send(None)
else:
# If the task is finished and on the run queue and gets here, then it
# had an exception and was not await'ed on. Throwing into it now will
# raise StopIteration and the code below will catch this and run the
# call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
@@ -185,22 +189,37 @@ def run_until_complete(main_task=None):
if isinstance(er, StopIteration):
return er.value
raise er
# Schedule any other tasks waiting on the completion of this task
waiting = False
if hasattr(t, "waiting"):
while t.waiting.peek():
_task_queue.push_head(t.waiting.pop_head())
if t.state:
# Task was running but is now finished.
waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
t.waiting = None # Free waiting queue head
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Indicate task is done by setting coro to the task object itself
t.coro = t
# Save return value of coro to pass up to caller
t.data = er
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push(t.state.pop())
waiting = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push(t)
# Save return value of coro to pass up to caller.
t.data = er
elif t.state is None:
# Task is already finished and nothing await'ed on the task,
# so call the exception handler.
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
@@ -237,7 +256,7 @@ class Loop:
def stop():
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
_task_queue.push(_stop_task)
# If stop() is called again, do nothing
_stop_task = None

View File

@@ -17,7 +17,7 @@ class Event:
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
core._task_queue.push(self.waiting.pop())
self.state = True
def clear(self):
@@ -26,7 +26,7 @@ class Event:
async def wait(self):
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
self.waiting.push(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
yield
@@ -36,27 +36,29 @@ class Event:
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
# Note: Unlike Event, this is self-clearing after a wait().
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
self.state = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return self.state * flags
return None
def set(self):
self._flag = 1
self.state = 1
def clear(self):
self.state = 0
async def wait(self):
if not self._flag:
if not self.state:
yield core._io_queue.queue_read(self)
self._flag = 0
self.state = 0
except ImportError:
pass

View File

@@ -1,49 +1,51 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# MIT license; Copyright (c) 2019-2022 Damien P. George
from . import core
def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)
async def wait_for(aw, timeout, sleep=core.sleep):
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()
# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
runner_task = core.create_task(_run(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
status = er.value
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
@@ -53,22 +55,75 @@ def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)
class _Remove:
@staticmethod
def remove(t):
pass
async def gather(*aws, return_exceptions=False):
if not aws:
return []
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push(gather_task)
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except Exception as er:
if return_exceptions:
ts[i] = er
else:
raise er
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done
# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False
# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
yield
except core.CancelledError as er:
cancel_all = True
state = er
# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state
# Return the list of return values of each sub-task.
return ts

View File

@@ -22,8 +22,8 @@ class Lock:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
self.state = self.waiting.pop()
core._task_queue.push(self.state)
else:
# No Task waiting so unlock
self.state = 0
@@ -31,7 +31,7 @@ class Lock:
async def acquire(self):
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
self.waiting.push(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:

View File

@@ -1,13 +1,15 @@
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
# This list of package files doesn't include task.py because that's provided
# by the C module.
package(
"uasyncio",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
"__init__.py",
"core.py",
"event.py",
"funcs.py",
"lock.py",
"stream.py",
),
base_path="..",
opt=3,
)

View File

@@ -26,9 +26,21 @@ class Stream:
# TODO yield?
self.s.close()
async def read(self, n):
async def read(self, n=-1):
r = b""
while True:
yield core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if n >= 0:
return r2
if not len(r2):
return r
r += r2
async def readinto(self, buf):
yield core._io_queue.queue_read(self.s)
return self.s.read(n)
return self.s.readinto(buf)
async def readexactly(self, n):
r = b""
@@ -52,9 +64,19 @@ class Stream:
return l
def write(self, buf):
if not self.out_buf:
# Try to write immediately to the underlying stream.
ret = self.s.write(buf)
if ret == len(buf):
return
if ret is not None:
buf = buf[ret:]
self.out_buf += buf
async def drain(self):
if not self.out_buf:
# Drain must always yield, so a tight loop of write+drain can't block the scheduler.
return await core.sleep_ms(0)
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
@@ -75,8 +97,8 @@ async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
@@ -103,16 +125,7 @@ class Server:
async def wait_closed(self):
await self.task
async def _serve(self, cb, host, port, backlog):
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
self.task = core.cur_task
async def _serve(self, s, cb):
# Accept incoming connections
while True:
try:
@@ -134,9 +147,20 @@ class Server:
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
s = Server()
core.create_task(s._serve(cb, host, port, backlog))
return s
import usocket as socket
# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.listen(backlog)
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
return srv
################################################################################

View File

@@ -99,19 +99,18 @@ class TaskQueue:
def peek(self):
return self.heap
def push_sorted(self, v, key):
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)
def push_head(self, v):
self.push_sorted(v, core.ticks())
def pop_head(self):
def pop(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v
def remove(self, v):
@@ -123,6 +122,7 @@ class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
@@ -130,30 +130,33 @@ class Task:
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if self.coro is self:
# Signal that the completed-task has been await'ed on.
self.waiting = None
elif not hasattr(self, "waiting"):
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
self.waiting = TaskQueue()
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self
def __next__(self):
if self.coro is self:
if not self.state:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.waiting.push_head(core.cur_task)
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
return self.coro is self
return not self.state
def cancel(self):
# Check if task is already finished.
if self.coro is self:
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
@@ -165,20 +168,10 @@ class Task:
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True
def throw(self, value):
# This task raised an exception which was uncaught; handle that now.
# Set the data because it was cleared by the main scheduling loop.
self.data = value
if not hasattr(self, "waiting"):
# Nothing await'ed on the task so call the exception handler.
core._exc_context["exception"] = value
core._exc_context["future"] = self
core.Loop.call_exception_handler(core._exc_context)

View File

@@ -1,12 +1,18 @@
import io
import os
import sys
try:
import traceback
except ImportError:
traceback = None
class SkipTest(Exception):
pass
class AssertRaisesContext:
def __init__(self, exc):
self.expected = exc
@@ -14,29 +20,98 @@ class AssertRaisesContext:
return self
def __exit__(self, exc_type, exc_value, tb):
self.exception = exc_value
if exc_type is None:
assert False, "%r not raised" % self.expected
if issubclass(exc_type, self.expected):
# store exception for later retrieval
self.exception = exc_value
return True
return False
class TestCase:
# These are used to provide required context to things like subTest
__current_test__ = None
__test_result__ = None
def fail(self, msg=''):
class SubtestContext:
def __init__(self, msg=None, params=None):
self.msg = msg
self.params = params
def __enter__(self):
pass
def __exit__(self, *exc_info):
if exc_info[0] is not None:
# Exception raised
global __test_result__, __current_test__
test_details = __current_test__
if self.msg:
test_details += (f" [{self.msg}]",)
if self.params:
detail = ", ".join(f"{k}={v}" for k, v in self.params.items())
test_details += (f" ({detail})",)
_handle_test_exception(test_details, __test_result__, exc_info, False)
# Suppress the exception as we've captured it above
return True
class NullContext:
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
pass
class TestCase:
def __init__(self):
pass
def addCleanup(self, func, *args, **kwargs):
if not hasattr(self, "_cleanups"):
self._cleanups = []
self._cleanups.append((func, args, kwargs))
def doCleanups(self):
if hasattr(self, "_cleanups"):
while self._cleanups:
func, args, kwargs = self._cleanups.pop()
func(*args, **kwargs)
def subTest(self, msg=None, **params):
return SubtestContext(msg=msg, params=params)
def skipTest(self, reason):
raise SkipTest(reason)
def fail(self, msg=""):
assert False, msg
def assertEqual(self, x, y, msg=''):
def assertEqual(self, x, y, msg=""):
if not msg:
msg = "%r vs (expected) %r" % (x, y)
assert x == y, msg
def assertNotEqual(self, x, y, msg=''):
def assertNotEqual(self, x, y, msg=""):
if not msg:
msg = "%r not expected to be equal %r" % (x, y)
assert x != y, msg
def assertAlmostEqual(self, x, y, places=None, msg='', delta=None):
def assertLessEqual(self, x, y, msg=None):
if msg is None:
msg = "%r is expected to be <= %r" % (x, y)
assert x <= y, msg
def assertGreaterEqual(self, x, y, msg=None):
if msg is None:
msg = "%r is expected to be >= %r" % (x, y)
assert x >= y, msg
def assertAlmostEqual(self, x, y, places=None, msg="", delta=None):
if x == y:
return
if delta is not None and places is not None:
@@ -46,18 +121,18 @@ class TestCase:
if abs(x - y) <= delta:
return
if not msg:
msg = '%r != %r within %r delta' % (x, y, delta)
msg = "%r != %r within %r delta" % (x, y, delta)
else:
if places is None:
places = 7
if round(abs(y-x), places) == 0:
if round(abs(y - x), places) == 0:
return
if not msg:
msg = '%r != %r within %r places' % (x, y, places)
msg = "%r != %r within %r places" % (x, y, places)
assert False, msg
def assertNotAlmostEqual(self, x, y, places=None, msg='', delta=None):
def assertNotAlmostEqual(self, x, y, places=None, msg="", delta=None):
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
@@ -65,53 +140,53 @@ class TestCase:
if not (x == y) and abs(x - y) > delta:
return
if not msg:
msg = '%r == %r within %r delta' % (x, y, delta)
msg = "%r == %r within %r delta" % (x, y, delta)
else:
if places is None:
places = 7
if not (x == y) and round(abs(y-x), places) != 0:
if not (x == y) and round(abs(y - x), places) != 0:
return
if not msg:
msg = '%r == %r within %r places' % (x, y, places)
msg = "%r == %r within %r places" % (x, y, places)
assert False, msg
def assertIs(self, x, y, msg=''):
def assertIs(self, x, y, msg=""):
if not msg:
msg = "%r is not %r" % (x, y)
assert x is y, msg
def assertIsNot(self, x, y, msg=''):
def assertIsNot(self, x, y, msg=""):
if not msg:
msg = "%r is %r" % (x, y)
assert x is not y, msg
def assertIsNone(self, x, msg=''):
def assertIsNone(self, x, msg=""):
if not msg:
msg = "%r is not None" % x
assert x is None, msg
def assertIsNotNone(self, x, msg=''):
def assertIsNotNone(self, x, msg=""):
if not msg:
msg = "%r is None" % x
assert x is not None, msg
def assertTrue(self, x, msg=''):
def assertTrue(self, x, msg=""):
if not msg:
msg = "Expected %r to be True" % x
assert x, msg
def assertFalse(self, x, msg=''):
def assertFalse(self, x, msg=""):
if not msg:
msg = "Expected %r to be False" % x
assert not x, msg
def assertIn(self, x, y, msg=''):
def assertIn(self, x, y, msg=""):
if not msg:
msg = "Expected %r to be in %r" % (x, y)
assert x in y, msg
def assertIsInstance(self, x, y, msg=''):
def assertIsInstance(self, x, y, msg=""):
assert isinstance(x, y), msg
def assertRaises(self, exc, func=None, *args, **kwargs):
@@ -120,12 +195,15 @@ class TestCase:
try:
func(*args, **kwargs)
assert False, "%r not raised" % exc
except Exception as e:
if isinstance(e, exc):
return
raise
assert False, "%r not raised" % exc
def assertWarns(self, warn):
return NullContext()
def skip(msg):
@@ -133,92 +211,252 @@ def skip(msg):
# We just replace original fun with _inner
def _inner(self):
raise SkipTest(msg)
return _inner
return _decor
def skipIf(cond, msg):
if not cond:
return lambda x: x
return skip(msg)
def skipUnless(cond, msg):
if cond:
return lambda x: x
return skip(msg)
def expectedFailure(test):
def test_exp_fail(*args, **kwargs):
try:
test(*args, **kwargs)
except:
pass
else:
assert False, "unexpected success"
return test_exp_fail
class TestSuite:
def __init__(self):
self.tests = []
def __init__(self, name=""):
self._tests = []
self.name = name
def addTest(self, cls):
self.tests.append(cls)
self._tests.append(cls)
def run(self, result):
for c in self._tests:
_run_suite(c, result, self.name)
return result
def _load_module(self, mod):
for tn in dir(mod):
c = getattr(mod, tn)
if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase):
self.addTest(c)
elif tn.startswith("test") and callable(c):
self.addTest(c)
class TestRunner:
def run(self, suite):
def run(self, suite: TestSuite):
res = TestResult()
for c in suite.tests:
run_class(c, res)
suite.run(res)
res.printErrors()
print("----------------------------------------------------------------------")
print("Ran %d tests\n" % res.testsRun)
if res.failuresNum > 0 or res.errorsNum > 0:
print("FAILED (failures=%d, errors=%d)" % (res.failuresNum, res.errorsNum))
else:
msg = "OK"
if res.skippedNum > 0:
msg += " (%d skipped)" % res.skippedNum
msg += " (skipped=%d)" % res.skippedNum
print(msg)
return res
TextTestRunner = TestRunner
class TestResult:
def __init__(self):
self.errorsNum = 0
self.failuresNum = 0
self.skippedNum = 0
self.testsRun = 0
self.errors = []
self.failures = []
self.skipped = []
self._newFailures = 0
def wasSuccessful(self):
return self.errorsNum == 0 and self.failuresNum == 0
# TODO: Uncompliant
def run_class(c, test_result):
o = c()
def printErrors(self):
print()
self.printErrorList(self.errors)
self.printErrorList(self.failures)
def printErrorList(self, lst):
sep = "----------------------------------------------------------------------"
for c, e in lst:
detail = " ".join((str(i) for i in c))
print("======================================================================")
print(f"FAIL: {detail}")
print(sep)
print(e)
def __repr__(self):
# Format is compatible with CPython.
return "<unittest.result.TestResult run=%d errors=%d failures=%d>" % (
self.testsRun,
self.errorsNum,
self.failuresNum,
)
def __add__(self, other):
self.errorsNum += other.errorsNum
self.failuresNum += other.failuresNum
self.skippedNum += other.skippedNum
self.testsRun += other.testsRun
self.errors.extend(other.errors)
self.failures.extend(other.failures)
self.skipped.extend(other.skipped)
return self
def _capture_exc(exc, exc_traceback):
buf = io.StringIO()
if hasattr(sys, "print_exception"):
sys.print_exception(exc, buf)
elif traceback is not None:
traceback.print_exception(None, exc, exc_traceback, file=buf)
return buf.getvalue()
def _handle_test_exception(
current_test: tuple, test_result: TestResult, exc_info: tuple, verbose=True
):
exc = exc_info[1]
traceback = exc_info[2]
ex_str = _capture_exc(exc, traceback)
if isinstance(exc, AssertionError):
test_result.failuresNum += 1
test_result.failures.append((current_test, ex_str))
if verbose:
print(" FAIL")
else:
test_result.errorsNum += 1
test_result.errors.append((current_test, ex_str))
if verbose:
print(" ERROR")
test_result._newFailures += 1
def _run_suite(c, test_result: TestResult, suite_name=""):
if isinstance(c, TestSuite):
c.run(test_result)
return
if isinstance(c, type):
o = c()
else:
o = c
set_up_class = getattr(o, "setUpClass", lambda: None)
tear_down_class = getattr(o, "tearDownClass", lambda: None)
set_up = getattr(o, "setUp", lambda: None)
tear_down = getattr(o, "tearDown", lambda: None)
for name in dir(o):
if name.startswith("test"):
print("%s (%s) ..." % (name, c.__qualname__), end="")
m = getattr(o, name)
set_up()
try:
test_result.testsRun += 1
m()
print(" ok")
except SkipTest as e:
print(" skipped:", e.args[0])
test_result.skippedNum += 1
except:
exceptions = []
try:
suite_name += "." + c.__qualname__
except AttributeError:
pass
def run_one(test_function):
global __test_result__, __current_test__
print("%s (%s) ..." % (name, suite_name), end="")
set_up()
__test_result__ = test_result
test_container = f"({suite_name})"
__current_test__ = (name, test_container)
try:
test_result._newFailures = 0
test_result.testsRun += 1
test_function()
# No exception occurred, test passed
if test_result._newFailures:
print(" FAIL")
test_result.failuresNum += 1
# Uncomment to investigate failure in detail
#raise
continue
finally:
tear_down()
else:
print(" ok")
except SkipTest as e:
reason = e.args[0]
print(" skipped:", reason)
test_result.skippedNum += 1
test_result.skipped.append((name, c, reason))
except Exception as ex:
_handle_test_exception(
current_test=(name, c), test_result=test_result, exc_info=(type(ex), ex, None)
)
# Uncomment to investigate failure in detail
# raise
finally:
__test_result__ = None
__current_test__ = None
tear_down()
try:
o.doCleanups()
except AttributeError:
pass
set_up_class()
try:
if hasattr(o, "runTest"):
name = str(o)
run_one(o.runTest)
return
for name in dir(o):
if name.startswith("test"):
m = getattr(o, name)
if not callable(m):
continue
run_one(m)
if callable(o):
name = o.__name__
run_one(o)
finally:
tear_down_class()
return exceptions
def main(module="__main__"):
def test_cases(m):
for tn in dir(m):
c = getattr(m, tn)
if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase):
yield c
# This supports either:
#
# >>> import mytest
# >>> unitttest.main(mytest)
#
# >>> unittest.main("mytest")
#
# Or, a script that ends with:
# if __name__ == "__main__":
# unittest.main()
# e.g. run via `mpremote run mytest.py`
def main(module="__main__", testRunner=None):
if testRunner is None:
testRunner = TestRunner()
elif isinstance(testRunner, type):
testRunner = testRunner()
m = __import__(module)
suite = TestSuite()
for c in test_cases(m):
suite.addTest(c)
runner = TestRunner()
result = runner.run(suite)
# Terminate with non zero return code in case of failures
sys.exit(result.failuresNum > 0)
if isinstance(module, str):
module = __import__(module)
suite = TestSuite(module.__name__)
suite._load_module(module)
return testRunner.run(suite)

View File

@@ -1,6 +1,6 @@
[metadata]
name = microdot
version = 1.1.2.dev0
version = 1.2.2
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython
@@ -22,19 +22,22 @@ zip_safe = False
include_package_data = True
package_dir =
= src
py_modules =
microdot
microdot_asyncio
microdot_utemplate
microdot_jinja
microdot_session
microdot_auth
microdot_login
microdot_websocket
microdot_websocket_alt
microdot_asyncio_websocket
microdot_test_client
microdot_asyncio_test_client
microdot_wsgi
microdot_asgi
microdot_asgi_websocket
packages = find:
[options.packages.find]
where = src
#py_modules =
# microdot
# microdot_asyncio
# microdot_utemplate
# microdot_jinja
# microdot_session
# microdot_websocket
# microdot_websocket_alt
# microdot_asyncio_websocket
# microdot_test_client
# microdot_asyncio_test_client
# microdot_wsgi
# microdot_asgi
# microdot_asgi_websocket

View File

@@ -43,11 +43,13 @@ try:
except ImportError:
import re
socket_timeout_error = OSError
try:
import usocket as socket
except ImportError:
try:
import socket
socket_timeout_error = socket.timeout
except ImportError: # pragma: no cover
socket = None
@@ -92,8 +94,9 @@ def urldecode_bytes(s):
def urlencode(s):
return s.replace(' ', '+').replace('%', '%25').replace('?', '%3F').replace(
'#', '%23').replace('&', '%26').replace('+', '%2B')
return s.replace('+', '%2B').replace(' ', '+').replace(
'%', '%25').replace('?', '%3F').replace('#', '%23').replace(
'&', '%26').replace('=', '%3D')
class NoCaseDict(dict):
@@ -144,6 +147,39 @@ class NoCaseDict(dict):
return super().get(self.keymap.get(kl, kl), default)
def mro(cls): # pragma: no cover
"""Return the method resolution order of a class.
This is a helper function that returns the method resolution order of a
class. It is used by Microdot to find the best error handler to invoke for
the raised exception.
In CPython, this function returns the ``__mro__`` attribute of the class.
In MicroPython, this function implements a recursive depth-first scanning
of the class hierarchy.
"""
if hasattr(cls, 'mro'):
return cls.__mro__
def _mro(cls):
m = [cls]
for base in cls.__bases__:
m += _mro(base)
return m
mro_list = _mro(cls)
# If a class appears multiple times (due to multiple inheritance) remove
# all but the last occurence. This matches the method resolution order
# of MicroPython, but not CPython.
mro_pruned = []
for i in range(len(mro_list)):
base = mro_list.pop(0)
if base not in mro_list:
mro_pruned.append(base)
return mro_pruned
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
@@ -266,6 +302,11 @@ class Request():
#: Request.max_readline = 16 * 1024 # 16KB lines allowed
max_readline = 2 * 1024
#: Specify a suggested read timeout to use when reading the request. Set to
#: 0 to disable the use of a timeout. This timeout should be considered a
#: suggestion only, as some platforms may not support it.
socket_read_timeout = 0.1
class G:
pass
@@ -438,6 +479,9 @@ class Request():
return response
return 'Hello, World!'
Note that the function is not called if the request handler raises an
exception and an error response is returned instead.
"""
self.after_request_handlers.append(f)
return f
@@ -712,6 +756,7 @@ class Microdot():
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.after_error_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
@@ -873,6 +918,24 @@ class Microdot():
self.after_request_handlers.append(f)
return f
def after_error_request(self, f):
"""Decorator to register a function to run after an error response is
generated. The decorated function must take two arguments, the request
and response objects. The return value of the function must be an
updated response object. The handler is invoked for error responses
generated by Microdot, as well as those returned by application-defined
error handlers.
Example::
@app.after_error_request
def func(request, response):
# ...
return response
"""
self.after_error_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
"""Decorator to register a function as an error handler. Error handler
functions for numeric HTTP status codes must accept a single argument,
@@ -913,6 +976,8 @@ class Microdot():
self.before_request_handlers.append(handler)
for handler in subapp.after_request_handlers:
self.after_request_handlers.append(handler)
for handler in subapp.after_error_request_handlers:
self.after_error_request_handlers.append(handler)
for status_code, handler in subapp.error_handlers.items():
self.error_handlers[status_code] = handler
@@ -1027,6 +1092,9 @@ class Microdot():
return f
def handle_request(self, sock, addr):
if Request.socket_read_timeout and \
hasattr(sock, 'settimeout'): # pragma: no cover
sock.settimeout(Request.socket_read_timeout)
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
@@ -1037,6 +1105,9 @@ class Microdot():
try:
req = Request.create(self, stream, addr, sock)
res = self.dispatch_request(req)
except socket_timeout_error as exc: # pragma: no cover
if exc.errno and exc.errno not in [60, 110]:
print_exception(exc) # not a timeout
except Exception as exc: # pragma: no cover
print_exception(exc)
try:
@@ -1060,6 +1131,7 @@ class Microdot():
status_code=res.status_code))
def dispatch_request(self, req):
after_request_handled = False
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
@@ -1092,6 +1164,7 @@ class Microdot():
res = handler(req, res) or res
for handler in req.after_request_handlers:
res = handler(req, res) or res
after_request_handled = True
elif f in self.error_handlers:
res = self.error_handlers[f](req)
else:
@@ -1103,10 +1176,18 @@ class Microdot():
res = exc.reason, exc.status_code
except Exception as exc:
print_exception(exc)
exc_class = None
res = None
if exc.__class__ in self.error_handlers:
exc_class = exc.__class__
else:
for c in mro(exc.__class__)[1:]:
if c in self.error_handlers:
exc_class = c
break
if exc_class:
try:
res = self.error_handlers[exc.__class__](req, exc)
res = self.error_handlers[exc_class](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
@@ -1124,6 +1205,9 @@ class Microdot():
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
if not after_request_handled:
for handler in self.after_error_request_handlers:
res = handler(req, res) or res
return res

View File

@@ -93,10 +93,10 @@ class Microdot(BaseMicrodot):
header_list = []
for name, value in res.headers.items():
if not isinstance(value, list):
header_list.append((name, value))
header_list.append((name.lower().encode(), value.encode()))
else:
for v in value:
header_list.append((name, v))
header_list.append((name.lower().encode(), v.encode()))
if scope['type'] != 'http': # pragma: no cover
return

View File

@@ -17,6 +17,7 @@ except ImportError:
import io
from microdot import Microdot as BaseMicrodot
from microdot import mro
from microdot import NoCaseDict
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
@@ -346,6 +347,7 @@ class Microdot(BaseMicrodot):
status_code=res.status_code))
async def dispatch_request(self, req):
after_request_handled = False
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
@@ -380,7 +382,9 @@ class Microdot(BaseMicrodot):
res = await self._invoke_handler(
handler, req, res) or res
for handler in req.after_request_handlers:
res = await handler(req, res) or res
res = await self._invoke_handler(
handler, req, res) or res
after_request_handled = True
elif f in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[f], req)
@@ -393,11 +397,19 @@ class Microdot(BaseMicrodot):
res = exc.reason, exc.status_code
except Exception as exc:
print_exception(exc)
exc_class = None
res = None
if exc.__class__ in self.error_handlers:
exc_class = exc.__class__
else:
for c in mro(exc.__class__)[1:]:
if c in self.error_handlers:
exc_class = c
break
if exc_class:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
self.error_handlers[exc_class], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
@@ -415,6 +427,10 @@ class Microdot(BaseMicrodot):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
if not after_request_handled:
for handler in self.after_error_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
return res
async def _invoke_handler(self, f_or_coro, *args, **kwargs):

View File

@@ -1,65 +0,0 @@
from microdot import abort
class BaseAuth:
def __init__(self, header='Authorization', scheme=None):
self.auth_callback = None
self.error_callback = self.auth_failed
self.header = header
self.scheme = scheme.lower()
def callback(self, f):
"""Decorator to configure the authentication callback.
Microdot calls the authentication callback to allow the application to
check user credentials.
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def auth_failed(self):
abort(401)
def __call__(self, func):
def wrapper(request, *args, **kwargs):
auth = request.headers.get(self.header)
if not auth:
return self.error_callback()
if self.header == 'Authorization':
if ' ' not in auth:
return self.error_callback()
scheme, auth = auth.split(' ', 1)
if scheme.lower() != self.scheme:
return self.error_callback()
if not self.auth_callback(request, *self._get_auth_args(auth)):
return self.error_callback()
return func(request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
def __init__(self):
super().__init__(scheme='Basic')
def _get_auth_args(self, auth):
import binascii
username, password = binascii.a2b_base64(auth).decode('utf-8').split(
':', 1)
return (username, password)
class TokenAuth(BaseAuth):
def __init__(self, header='Authorization', scheme='Bearer'):
super().__init__(header=header, scheme=scheme)
def _get_auth_args(self, token):
return (token,)

View File

@@ -1,46 +0,0 @@
from microdot import redirect, urlencode
from microdot_session import get_session, update_session
class LoginAuth:
def __init__(self, login_url='/login'):
super().__init__()
self.login_url = login_url
self.user_callback = self._accept_user
def callback(self, f):
self.user_callback = f
def login_user(self, request, user_id):
session = get_session(request)
session['user_id'] = user_id
update_session(request, session)
return session
def logout_user(self, request):
session = get_session(request)
session.pop('user_id', None)
update_session(request, session)
return session
def redirect_to_next(self, request, default_url='/'):
next_url = request.args.get('next', default_url)
if not next_url.startswith('/'):
next_url = default_url
return redirect(next_url)
def __call__(self, func):
def wrapper(request, *args, **kwargs):
session = get_session(request)
if 'user_id' not in session:
return redirect(self.login_url + '?next=' + urlencode(
request.url))
if not self.user_callback(request, session['user_id']):
return redirect(self.login_url + '?next=' + urlencode(
request.url))
return func(request, *args, **kwargs)
return wrapper
def _accept_user(self, request, user_id):
return True

View File

@@ -279,6 +279,39 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.headers['Content-Length'], '3')
self.assertEqual(res.text, 'baz')
def test_after_error_request(self):
app = Microdot()
@app.after_error_request
def after_error_request_one(req, res):
res.headers['X-One'] = '1'
@app.after_error_request
def after_error_request_two(req, res):
res.set_cookie('foo', 'bar')
return res
@app.route('/foo')
def foo(req):
return 'foo'
client = TestClient(app)
res = client.get('/foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertFalse('X-One' in res.headers)
self.assertFalse('Set-Cookie' in res.headers)
res = client.get('/bar')
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Set-Cookie'], ['foo=bar'])
self.assertEqual(res.headers['X-One'], '1')
self.assertEqual(client.cookies['foo'], 'bar')
def test_400(self):
self._mock()
@@ -465,6 +498,90 @@ class TestMicrodot(unittest.TestCase):
'text/plain; charset=UTF-8')
self.assertEqual(res.text, '501')
def test_exception_handler_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return exc.__class__.__name__, 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_redundant_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return 'LookupError', 501
@app.errorhandler(IndexError)
def handle_index_error(req, exc):
return 'IndexError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_multiple_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(Exception)
def handle_generic_exception(req, exc):
return 'Exception', 501
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return 'LookupError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'LookupError')
def test_exception_handler_no_viable_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(RuntimeError)
def handle_runtime_error(req, exc):
return 'RuntimeError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 500)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'Internal server error')
def test_abort(self):
app = Microdot()
@@ -577,7 +694,11 @@ class TestMicrodot(unittest.TestCase):
@subapp.after_request
def after(req, res):
return res.body + b':after'
res.body += b':after'
@subapp.after_error_request
def after_error(req, res):
res.body += b':errorafter'
@subapp.errorhandler(404)
def not_found(req):
@@ -596,7 +717,7 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, '404')
self.assertEqual(res.text, '404:errorafter')
res = client.get('/sub/app')
self.assertEqual(res.status_code, 200)

View File

@@ -83,10 +83,10 @@ class TestMicrodotASGI(unittest.TestCase):
if packet['type'] == 'http.response.start':
self.assertEqual(packet['status'], 200)
expected_headers = [
('Content-Length', '8'),
('Content-Type', 'text/plain; charset=UTF-8'),
('Set-Cookie', 'foo=foo'),
('Set-Cookie', 'bar=bar; HttpOnly')
(b'content-length', b'8'),
(b'content-type', b'text/plain; charset=UTF-8'),
(b'set-cookie', b'foo=foo'),
(b'set-cookie', b'bar=bar; HttpOnly')
]
self.assertEqual(len(packet['headers']), len(expected_headers))
for header in expected_headers:

View File

@@ -314,6 +314,39 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertEqual(res.headers['Content-Length'], '3')
self.assertEqual(res.text, 'baz')
def test_after_error_request(self):
app = Microdot()
@app.after_error_request
def after_error_request_one(req, res):
res.headers['X-One'] = '1'
@app.after_error_request
def after_error_request_two(req, res):
res.set_cookie('foo', 'bar')
return res
@app.route('/foo')
def foo(req):
return 'foo'
client = TestClient(app)
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertFalse('X-One' in res.headers)
self.assertFalse('Set-Cookie' in res.headers)
res = self._run(client.get('/bar'))
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Set-Cookie'], ['foo=bar'])
self.assertEqual(res.headers['X-One'], '1')
self.assertEqual(client.cookies['foo'], 'bar')
def test_400(self):
self._mock()
@@ -500,6 +533,90 @@ class TestMicrodotAsync(unittest.TestCase):
'text/plain; charset=UTF-8')
self.assertEqual(res.text, '501')
def test_exception_handler_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return exc.__class__.__name__, 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_redundant_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return 'LookupError', 501
@app.errorhandler(IndexError)
async def handle_index_error(req, exc):
return 'IndexError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_multiple_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(Exception)
async def handle_generic_exception(req, exc):
return 'Exception', 501
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return 'LookupError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'LookupError')
def test_exception_handler_no_viable_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(RuntimeError)
async def handle_runtime_error(req, exc):
return 'RuntimeError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 500)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'Internal server error')
def test_abort(self):
app = Microdot()

View File

@@ -1,113 +0,0 @@
import binascii
import unittest
from microdot import Microdot
from microdot_auth import BasicAuth, TokenAuth
from microdot_test_client import TestClient
class TestAuth(unittest.TestCase):
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.callback
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
request.g.user = {'username': username}
return True
@app.route('/')
@basic_auth
def index(request):
return request.g.user['username']
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()})
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.callback
def authenticate(request, token):
if token == 'foo':
request.g.user = 'user'
return True
@app.route('/')
@token_auth
def index(request):
return request.g.user
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Basic foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Bearer foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.callback
def authenticate(request, token):
if token == 'foo':
request.g.user = 'user'
return True
@app.route('/')
@token_auth
def index(request):
return request.g.user
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Basic foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Bearer foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'X-Token-Auth': 'Bearer foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'X-Auth-Token': 'foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = client.get('/', headers={'x-auth-token': 'foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler():
return {'status_code': 403}, 403
res = client.get('/')
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

View File

@@ -1,134 +0,0 @@
import unittest
from microdot import Microdot
from microdot_login import LoginAuth
from microdot_session import set_session_secret_key, with_session
from microdot_test_client import TestClient
set_session_secret_key('top-secret!')
class TestLogin(unittest.TestCase):
def test_login_auth(self):
app = Microdot()
login_auth = LoginAuth()
@app.get('/')
@login_auth
def index(request):
return 'ok'
@app.post('/login')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
@app.post('/logout')
def logout(request):
login_auth.logout_user(request)
return 'ok'
client = TestClient(app)
res = client.get('/?foo=bar')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = client.post('/login?next=/%3Ffoo=bar')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'ok')
res = client.post('/logout')
self.assertEqual(res.status_code, 200)
res = client.get('/')
self.assertEqual(res.status_code, 302)
def test_login_auth_with_session(self):
app = Microdot()
login_auth = LoginAuth(login_url='/foo')
@app.get('/')
@login_auth
@with_session
def index(request, session):
return session['user_id']
@app.post('/foo')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/foo?next=/')
res = client.post('/foo')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_login_auth_user_callback(self):
app = Microdot()
login_auth = LoginAuth()
@login_auth.callback
def check_user(request, user_id):
request.g.user_id = user_id
return user_id == 'user'
@app.get('/')
@login_auth
def index(request):
return request.g.user_id
@app.post('/good-login')
def good_login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
@app.post('/bad-login')
def bad_login(request):
login_auth.login_user(request, 'foo')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.post('/good-login')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = client.post('/bad-login')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/')
def test_login_auth_bad_redirect(self):
app = Microdot()
login_auth = LoginAuth()
@app.get('/')
@login_auth
def index(request):
return 'ok'
@app.post('/login')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.post('/login?next=http://example.com')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')

View File

@@ -1,22 +1,24 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
import unittest
from microdot import Microdot
from microdot_asyncio import Microdot as MicrodotAsync
from microdot_session import set_session_secret_key, get_session, \
update_session, delete_session, with_session
from microdot_test_client import TestClient
from microdot_asyncio_test_client import TestClient as TestClientAsync
set_session_secret_key('top-secret!')
class TestSession(unittest.TestCase):
def setUp(self):
self.app = Microdot()
self.client = TestClient(self.app)
def tearDown(self):
pass
def test_session(self):
@self.app.get('/')
app = Microdot()
client = TestClient(app)
@app.get('/')
def index(req):
session = get_session(req)
session2 = get_session(req)
@@ -24,52 +26,106 @@ class TestSession(unittest.TestCase):
self.assertEqual(session['foo'], 'bar')
return str(session.get('name'))
@self.app.get('/with')
@app.get('/with')
@with_session
def session_context_manager(req, session):
return str(session.get('name'))
@self.app.post('/set')
@app.post('/set')
def set_session(req):
update_session(req, {'name': 'joe'})
return 'OK'
@self.app.post('/del')
@app.post('/del')
def del_session(req):
delete_session(req)
return 'OK'
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'None')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'None')
res = self.client.post('/set')
res = client.post('/set')
self.assertEqual(res.text, 'OK')
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'joe')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'joe')
res = self.client.post('/del')
res = client.post('/del')
self.assertEqual(res.text, 'OK')
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'None')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'None')
def _run(self, coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
def test_session_async(self):
app = MicrodotAsync()
client = TestClientAsync(app)
@app.get('/')
async def index(req):
session = get_session(req)
session2 = get_session(req)
session2['foo'] = 'bar'
self.assertEqual(session['foo'], 'bar')
return str(session.get('name'))
@app.get('/with')
@with_session
async def session_context_manager(req, session):
return str(session.get('name'))
@app.post('/set')
async def set_session(req):
update_session(req, {'name': 'joe'})
return 'OK'
@app.post('/del')
async def del_session(req):
delete_session(req)
return 'OK'
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'None')
res = self._run(client.post('/set'))
self.assertEqual(res.text, 'OK')
res = self._run(client.get('/'))
self.assertEqual(res.text, 'joe')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'joe')
res = self._run(client.post('/del'))
self.assertEqual(res.text, 'OK')
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'None')
def test_session_no_secret_key(self):
set_session_secret_key(None)
app = Microdot()
client = TestClient(app)
@self.app.get('/')
@app.get('/')
def index(req):
self.assertRaises(ValueError, get_session, req)
self.assertRaises(ValueError, update_session, req, {})
return ''
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
set_session_secret_key('top-secret!')

11
tests/test_urlencode.py Normal file
View File

@@ -0,0 +1,11 @@
import unittest
from microdot import urlencode, urldecode_str, urldecode_bytes
class TestURLEncode(unittest.TestCase):
def test_urlencode(self):
self.assertEqual(urlencode('?foo=bar&x'), '%3Ffoo%3Dbar%26x')
def test_urldecode(self):
self.assertEqual(urldecode_str('%3Ffoo%3Dbar%26x'), '?foo=bar&x')
self.assertEqual(urldecode_bytes(b'%3Ffoo%3Dbar%26x'), '?foo=bar&x')

View File

@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive

View File

@@ -1,6 +1,10 @@
#!/bin/bash
# this script updates the micropython binary in the /bin directory that is
# used to run unit tests under GitHub Actions builds
docker build -t micropython .
docker create -it --name dummy-micropython micropython
docker cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
docker rm dummy-micropython
DOCKER=${DOCKER:-docker}
$DOCKER build -t micropython .
$DOCKER create -it --name dummy-micropython micropython
$DOCKER cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
$DOCKER rm dummy-micropython

View File

@@ -1,15 +1,15 @@
[tox]
envlist=flake8,py36,py37,py38,py39,py310,upy,benchmark
envlist=flake8,py37,py38,py39,py310,py311,upy,benchmark
skipsdist=True
skip_missing_interpreters=True
[gh-actions]
python =
3.6: py36
3.7: py37
3.8: py38
3.9: py39
3.10: py310
3.11: py311
pypy3: pypy3
[testenv]
@@ -31,11 +31,11 @@ commands=
flake8 --ignore=W503 --exclude src/utemplate,tests/libs src tests examples
[testenv:upy]
whitelist_externals=sh
allowlist_externals=sh
commands=sh -c "bin/micropython run_tests.py"
[testenv:upy-mac]
whitelist_externals=micropython
allowlist_externals=micropython
commands=micropython run_tests.py
deps=