49 Commits

Author SHA1 Message Date
Miguel Grinberg
e373d037f0 login tests 2024-04-28 00:32:22 +01:00
Miguel Grinberg
4321d93a82 auth documentation 2024-04-28 00:32:20 +01:00
Miguel Grinberg
68b6cb9862 Example updates 2024-04-28 00:32:06 +01:00
Miguel Grinberg
c7f9b3ff3b lint issues 2024-04-28 00:32:06 +01:00
Miguel Grinberg
261dd2f980 documentation 2024-04-28 00:32:06 +01:00
Miguel Grinberg
f204416e36 remember tests 2024-04-28 00:32:06 +01:00
Miguel Grinberg
7bc66ce3bb auth examples 2024-04-28 00:32:06 +01:00
Miguel Grinberg
43f2227140 remember cookie 2024-04-28 00:32:06 +01:00
Miguel Grinberg
b0cddde6ec Basic, token and login authentication 2024-04-28 00:32:06 +01:00
Miguel Grinberg
dac6df7a7a use codecov token for coverage uploads #nolog 2024-04-28 00:31:53 +01:00
dependabot[bot]
5d6e838f3c Bump gunicorn from 21.2.0 to 22.0.0 in /examples/benchmark (#224) #nolog
Bumps [gunicorn](https://github.com/benoitc/gunicorn) from 21.2.0 to 22.0.0.
- [Release notes](https://github.com/benoitc/gunicorn/releases)
- [Commits](https://github.com/benoitc/gunicorn/compare/21.2.0...22.0.0)

---
updated-dependencies:
- dependency-name: gunicorn
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-17 23:52:37 +01:00
dependabot[bot]
563bfdc8f5 Bump idna from 3.6 to 3.7 in /examples/benchmark (#223) #nolog
Bumps [idna](https://github.com/kjd/idna) from 3.6 to 3.7.
- [Release notes](https://github.com/kjd/idna/releases)
- [Changelog](https://github.com/kjd/idna/blob/master/HISTORY.rst)
- [Commits](https://github.com/kjd/idna/compare/v3.6...v3.7)

---
updated-dependencies:
- dependency-name: idna
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-12 15:16:52 +01:00
Miguel Grinberg
679d8e63b8 Fix docs build #nolog 2024-03-24 19:56:43 +00:00
Miguel Grinberg
4cb155ee41 Improved cookie support in the test client 2024-03-24 19:45:22 +00:00
Miguel Grinberg
dea79c5ce2 Make Session class more reusable 2024-03-23 16:29:36 +00:00
Carlo Colombo
6b1fd61917 removed outdated import from documentation (Fixes #216) 2024-03-15 11:03:10 +00:00
Miguel Grinberg
f6876c0d15 Use @wraps on decorated functions 2024-03-14 00:16:35 +00:00
Hamsanger
904d5fcaa2 Add event ID to the SSE implementation (#213) 2024-03-10 23:52:43 +00:00
Miguel Grinberg
a0ea439def Add roadmap details to readme 2024-03-10 17:15:14 +00:00
Miguel Grinberg
a1801d9a53 Version 2.0.6.dev0 2024-03-09 10:48:18 +00:00
Miguel Grinberg
14f2c9d345 Release 2.0.5 2024-03-09 10:48:09 +00:00
Miguel Grinberg
d0a4cf8fa7 Handle 0 as an integer argument (Fixes #212) 2024-03-06 20:34:00 +00:00
Miguel Grinberg
901f4e55b8 Version 2.0.5.dev0 2024-02-20 23:15:01 +00:00
Miguel Grinberg
53b28f9938 Release 2.0.4 2024-02-20 23:12:31 +00:00
Miguel Grinberg
f6cba2c0f7 More URLPattern unit tests 2024-02-20 23:09:24 +00:00
Miguel Grinberg
38262c56d3 Do not use regexes for parsing simple URLs (Fixes #207) 2024-02-18 15:05:41 +00:00
dependabot[bot]
a3363c7b8c Bump fastapi from 0.104.1 to 0.109.1 in /examples/benchmark (#203) #nolog
Bumps [fastapi](https://github.com/tiangolo/fastapi) from 0.104.1 to 0.109.1.
- [Release notes](https://github.com/tiangolo/fastapi/releases)
- [Commits](https://github.com/tiangolo/fastapi/compare/0.104.1...0.109.1)

---
updated-dependencies:
- dependency-name: fastapi
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-02-05 17:43:31 +00:00
Miguel Grinberg
e44c271bae Circuitpython build 2024-01-31 23:43:17 +00:00
Miguel Grinberg
bf519478cb Added documentation on using alternative utemplate loaders 2024-01-14 12:47:28 +00:00
dependabot[bot]
8d1ca808cb Bump jinja2 from 3.1.2 to 3.1.3 in /examples/benchmark (#199) #nolog
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.2 to 3.1.3.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/jinja/compare/3.1.2...3.1.3)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-01-11 22:56:46 +00:00
Miguel Grinberg
1f804f869c Version 2.0.4.dev0 2024-01-07 10:50:36 +00:00
Miguel Grinberg
7a6026006f Release 2.0.3 2024-01-07 10:50:28 +00:00
Miguel Grinberg
6712c47400 Pass keyword arguments to thread executor in the correct way (Fixes #195) 2024-01-07 10:44:16 +00:00
Miguel Grinberg
c8c91e8345 Update uasyncio to include new TLS support 2024-01-04 20:41:05 +00:00
Miguel Grinberg
5d188e8c0d Add a limit to WebSocket message size (Fixes #193) 2024-01-03 00:04:02 +00:00
Miguel Grinberg
b80b6b64d0 Documentation improvements 2023-12-28 12:59:19 +00:00
Miguel Grinberg
28007ea583 Version 2.0.3.dev0 2023-12-28 12:11:32 +00:00
Miguel Grinberg
300f8563ed Release 2.0.2 2023-12-28 12:10:46 +00:00
Miguel Grinberg
1fc11193da Support binary data in the SSE extension 2023-12-28 12:04:17 +00:00
Miguel Grinberg
79452a4699 Upgrade micropython tests to use v1.22, initial circuitpython work 2023-12-27 20:39:20 +00:00
Miguel Grinberg
84842e39c3 Improvements to migration guide 2023-12-26 20:00:07 +00:00
Miguel Grinberg
2a3c889717 typo in documentation #nolog 2023-12-26 17:07:20 +00:00
Tak Tran
ad368be993 Remove spurious async in documentation example (#187) 2023-12-23 14:08:12 +00:00
Miguel Grinberg
3df56c6ffe Version 2.0.2.dev0 2023-12-23 12:50:03 +00:00
Miguel Grinberg
c2e18004f7 Release 2.0.1 2023-12-23 12:49:52 +00:00
Miguel Grinberg
bd18ceb442 Addressed some inadvertent mistakes in the template extensions 2023-12-23 12:48:27 +00:00
Miguel Grinberg
f38d6d760a Updated readme and change log #nolog 2023-12-23 00:04:22 +00:00
Miguel Grinberg
dee4914bdd Version 2.0.1.dev0 2023-12-22 20:42:53 +00:00
Miguel Grinberg
92e571ee32 Release 2.0.0 2023-12-22 20:41:15 +00:00
71 changed files with 3058 additions and 349 deletions

View File

@@ -16,6 +16,7 @@ jobs:
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -eflake8
- run: tox -edocs
tests:
name: tests
strategy:
@@ -41,6 +42,15 @@ jobs:
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -eupy
tests-circuitpython:
name: tests-circuitpython
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -ecpy
coverage:
name: coverage
runs-on: ubuntu-latest
@@ -54,6 +64,7 @@ jobs:
with:
files: ./coverage.xml
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
benchmark:
name: benchmark
runs-on: ubuntu-latest

View File

@@ -1,5 +1,44 @@
# Microdot change log
**Release 2.0.5** - 2024-03-09
- Correct handling of 0 as an integer argument (regression from #207) [#212](https://github.com/miguelgrinberg/microdot/issues/212) ([commit](https://github.com/miguelgrinberg/microdot/commit/d0a4cf8fa7dfb1da7466157b18d3329a8cf9a5df))
**Release 2.0.4** - 2024-02-20
- Do not use regexes for parsing simple URLs [#207](https://github.com/miguelgrinberg/microdot/issues/207) ([commit #1](https://github.com/miguelgrinberg/microdot/commit/38262c56d34784401659639b482a4a1224e1e59a) [commit #2](https://github.com/miguelgrinberg/microdot/commit/f6cba2c0f7e18e2f32b5adb779fb037b6c473eab))
- Added documentation on using alternative uTemplate loaders ([commit](https://github.com/miguelgrinberg/microdot/commit/bf519478cbc6e296785241cd7d01edb23c317cd3))
- Added CircuitPython builds ([commit](https://github.com/miguelgrinberg/microdot/commit/e44c271bae88f4327d3eda16d8780ac264d1ebab))
**Release 2.0.3** - 2024-01-07
- Add a limit to WebSocket message size [#193](https://github.com/miguelgrinberg/microdot/issues/193) ([commit](https://github.com/miguelgrinberg/microdot/commit/5d188e8c0ddef6ce633ca702dbdd4a90f2799597))
- Pass keyword arguments to thread executor in the correct way [#195](https://github.com/miguelgrinberg/microdot/issues/195) ([commit](https://github.com/miguelgrinberg/microdot/commit/6712c47400d7c426c88032f65ab74466524eccab))
- Update uasyncio library used in tests to include new TLS support ([commit](https://github.com/miguelgrinberg/microdot/commit/c8c91e83457d24320f22c9a74e80b15e06b072ca))
- Documentation improvements ([commit](https://github.com/miguelgrinberg/microdot/commit/b80b6b64d02d21400ca8a5077f5ed1127cc202ae))
**Release 2.0.2** - 2023-12-28
- Support binary data in the SSE extension ([commit](https://github.com/miguelgrinberg/microdot/commit/1fc11193da0d298f5539e2ad218836910a13efb2))
- Upgrade micropython tests to use v1.22 + initial CircuitPython testing work ([commit](https://github.com/miguelgrinberg/microdot/commit/79452a46992351ccad2c0317c20bf50be0d76641))
- Improvements to migration guide ([commit](https://github.com/miguelgrinberg/microdot/commit/84842e39c360a8b3ddf36feac8af201fb19bbb0b))
- Remove spurious async in documentation example [#187](https://github.com/miguelgrinberg/microdot/issues/187) ([commit](https://github.com/miguelgrinberg/microdot/commit/ad368be993e2e3007579f1d3880e36d60c71da92)) (thanks **Tak Tran**!)
**Release 2.0.1** - 2023-12-23
- Addressed some inadvertent mistakes in the template extensions ([commit](https://github.com/miguelgrinberg/microdot/commit/bd18ceb4424e9dfb52b1e6d498edd260aa24fc53))
**Release 2.0.0** - 2023-12-22
- Major redesign [#186](https://github.com/miguelgrinberg/microdot/issues/186) ([commit](https://github.com/miguelgrinberg/microdot/commit/20ea305fe793eb206b52af9eb5c5f3c1e9f57dbb))
- Code reorganization as a `microdot` package
- Asyncio is now the core implementation
- New support for Server-Sent Events (SSE)
- Several extensions redesigned
- Support for "partitioned" cookies
- [Cross-compiling and freezing](https://microdot.readthedocs.io/en/stable/freezing.html) guidance
- A [Migration Guide](https://microdot.readthedocs.io/en/stable/migrating.html) to help transition to version 2 from older releases
**Release 1.3.4** - 2023-11-08
- Handle change in `wait_closed()` behavior in Python 3.12 [#177](https://github.com/miguelgrinberg/microdot/issues/177) ([commit](https://github.com/miguelgrinberg/microdot/commit/5550b20cdd347d59e2aa68f6ebf9e9abffaff9fc))

View File

@@ -32,5 +32,25 @@ describes the backwards incompatible changes that were made.
## Resources
- Documentation: [Latest](https://microdot.readthedocs.io/en/latest/) [Stable](https://microdot.readthedocs.io/en/stable/) [Legacy v1.x](https://microdot.readthedocs.io/en/v1/)
- [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md)
- Documentation
- [Latest](https://microdot.readthedocs.io/en/latest/)
- [Stable (v2)](https://microdot.readthedocs.io/en/stable/)
- [Legacy (v1)](https://microdot.readthedocs.io/en/v1/) ([Code](https://github.com/miguelgrinberg/microdot/tree/v1))
## Roadmap
The following features are planned for future releases of Microdot, both for
MicroPython and CPython:
- Support for forms encoded in `multipart/form-data` format
- Authentication support, similar to [Flask-Login](https://github.com/maxcountryman/flask-login) for Flask
- OpenAPI integration, similar to [APIFairy](https://github.com/miguelgrinberg/apifairy) for Flask
In addition to the above, the following extensions are also under consideration,
but only for CPython:
- Database integration through [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy)
- Socket.IO support through [python-socketio](https://github.com/miguelgrinberg/python-socketio)
Do you have other ideas to propose? Let's [discuss them](https://github.com/miguelgrinberg/microdot/discussions/new?category=ideas)!

BIN
bin/circuitpython Executable file

Binary file not shown.

Binary file not shown.

View File

@@ -1,8 +1,8 @@
API Reference
=============
``microdot`` module
-------------------
Core API
--------
.. autoclass:: microdot.Microdot
:members:
@@ -14,51 +14,73 @@ API Reference
:members:
``websocket`` extension
-----------------------
WebSocket
---------
.. automodule:: microdot.websocket
:members:
``utemplate`` templating extension
----------------------------------
Server-Sent Events (SSE)
------------------------
.. automodule:: microdot.sse
:members:
Templates (uTemplate)
---------------------
.. automodule:: microdot.utemplate
:members:
``jinja`` templating extension
------------------------------
Templates (Jinja)
-----------------
.. automodule:: microdot.jinja
:members:
``session`` extension
---------------------
User Sessions
-------------
.. automodule:: microdot.session
:members:
``cors`` extension
------------------
Authentication
--------------
.. automodule:: microdot.auth
:inherited-members:
:special-members: __call__
:members:
User Logins
-----------
.. automodule:: microdot.login
:inherited-members:
:special-members: __call__
:members:
Cross-Origin Resource Sharing (CORS)
------------------------------------
.. automodule:: microdot.cors
:members:
``test_client`` extension
-------------------------
Test Client
-----------
.. automodule:: microdot.test_client
:members:
``asgi`` extension
------------------
ASGI
----
.. autoclass:: microdot.asgi.Microdot
:members:
:exclude-members: shutdown, run
``wsgi`` extension
-------------------
WSGI
----
.. autoclass:: microdot.wsgi.Microdot
:members:

View File

@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
described in this section are maintained as part of the Microdot project in
the same source code repository.
WebSocket Support
~~~~~~~~~~~~~~~~~
WebSocket
~~~~~~~~~
.. list-table::
:align: left
@@ -39,8 +39,8 @@ Example::
message = await ws.receive()
await ws.send(message)
Server-Sent Events Support
~~~~~~~~~~~~~~~~~~~~~~~~~~
Server-Sent Events
~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -78,8 +78,8 @@ Example::
the SSE object. For bidirectional communication with the client, use the
WebSocket extension.
Rendering Templates
~~~~~~~~~~~~~~~~~~~
Templates
~~~~~~~~~
Many web applications use HTML templates for rendering content to clients.
Microdot includes extensions to render templates with the
@@ -129,11 +129,25 @@ are the asynchronous versions of these two methods.
The default location from where templates are loaded is the *templates*
subdirectory. This location can be changed with the
:func:`init_templates <microdot.utemplate.init_templates>` function::
:func:`Template.initialize <microdot.utemplate.Template.initialize>` class
method::
from microdot.utemplate import init_templates
Template.initialize('my_templates')
init_templates('my_templates')
By default templates are automatically compiled the first time they are
rendered, or when their last modified timestamp is more recent than the
compiledo file's timestamp. This loading behavior can be changed by switching
to a different template loader. For example, if the templates are pre-compiled,
the timestamp check and compile steps can be removed by switching to the
"compiled" template loader::
from utemplate import compiled
from microdot.utemplate import Template
Template.initialize(loader_class=compiled.Loader)
Consult the `uTemplate documentation <https://github.com/pfalcon/utemplate>`_
for additional information regarding template loaders.
Using the Jinja Engine
^^^^^^^^^^^^^^^^^^^^^^
@@ -174,24 +188,22 @@ method, which returns a generator instead of a string.
The default location from where templates are loaded is the *templates*
subdirectory. This location can be changed with the
:func:`init_templates <microdot.utemplate.init_templates>` function::
:func:`Template.initialize <microdot.jinja.Template.initialize>` class method::
from microdot.jinja import init_templates
Template.initialize('my_templates')
init_templates('my_templates')
The ``init_templates()`` function also accepts ``enable_async`` argument, which
The ``initialize()`` method also accepts ``enable_async`` argument, which
can be set to ``True`` if asynchronous rendering of templates is desired. If
this option is enabled, then the
:func:`render_async() <microdot.utemplate.Template.render_async>` and
:func:`generate_async() <microdot.utemplate.Template.generate_async>` methods
:func:`render_async() <microdot.jinja.Template.render_async>` and
:func:`generate_async() <microdot.jinja.Template.generate_async>` methods
must be used.
.. note::
The Jinja extension is not compatible with MicroPython.
Maintaining Secure User Sessions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Secure User Sessions
~~~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -258,6 +270,117 @@ The :func:`save() <microdot.session.SessionDict.save>` and
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
and destroy the user session respectively.
Authentication
~~~~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
* - Required external dependencies
- | None
* - Examples
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
The authentication extension provides helper classes for two commonly used
authentication patterns, described below.
Basic Authentication
^^^^^^^^^^^^^^^^^^^^
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
is a method of authentication that is part of the HTTP specification. It allows
clients to authenticate to a server using a username and a password. Web
browsers have native support for Basic Authentication and will automatically
prompt the user for a username and a password when a protected resource is
accessed.
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
class::
from microdot.auth import BasicAuth
auth = BasicAuth(app)
Next, create an authentication function. The function must accept a request
object and a username and password pair provided by the user. If the
credentials are valid, the function must return an object that represents the
user. Decorate the function with ``@auth.authenticate``::
@auth.authenticate
async def verify_user(request, username, password):
user = await load_user_from_database(username)
if user and user.verify_password(password):
return user
If the authentication function cannot validate the user provided credentials it
must return ``None``.
To protect a route with authentication, add the ``auth`` instance as a
decorator::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
While running an authenticated request, the user object returned by the
authenticaction function is accessible as ``request.g.current_user``.
Token Authentication
^^^^^^^^^^^^^^^^^^^^
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
from microdot.auth import TokenAuth
auth = TokenAuth()
Then add a function that verifies the token and returns the user it belongs to,
or ``None`` if the token is invalid or expired::
@auth.authenticate
async def verify_token(request, token):
return load_user_from_token(token)
As with Basic authentication, the ``auth`` instance is used as a decorator to
protect your routes::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
User Logins
~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
* - Required external dependencies
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
* - Examples
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -293,8 +416,8 @@ Example::
cors = CORS(app, allowed_origins=['https://example.com'],
allow_credentials=True)
Testing with the Test Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Client
~~~~~~~~~~~
.. list-table::
:align: left
@@ -330,8 +453,8 @@ Example::
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
class for more details.
Deploying on a Production Web Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Production Deployments
~~~~~~~~~~~~~~~~~~~~~~
The ``Microdot`` class creates its own simple web server. This is enough for an
application deployed with MicroPython, but when using CPython it may be useful

View File

@@ -25,14 +25,15 @@ and incorporated into a custom MicroPython firmware.
Use the following guidelines to know what files to copy:
- For a minimal setup with only the base web server functionality, copy
* For a minimal setup with only the base web server functionality, copy
`microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
into your project.
- For a configuration that includes one or more optional extensions, create a
* For a configuration that includes one or more optional extensions, create a
*microdot* directory in your device and copy the following files:
- `__init__.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/__init__.py>`_
- `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
- any needed `extensions <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot>`_.
* `__init__.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/__init__.py>`_
* `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
* any needed `extensions <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot>`_.
Getting Started
@@ -171,10 +172,8 @@ configure the web server.
app.run(port=4443, debug=True, ssl=sslctx)
.. note::
The ``ssl`` argument can only be used with CPython at this time, because
MicroPython's asyncio module does not currently support SSL certificates or
TLS encryption. Work on this is
`in progress <https://github.com/micropython/micropython/pull/11897>`_.
When using CPython, the certificate and key files must be given in PEM
format. When using MicroPython, these files must be given in DER format.
Defining Routes
~~~~~~~~~~~~~~~
@@ -297,7 +296,7 @@ match and the route will not be called.
A special type ``path`` can be used to capture the remainder of the path as a
single argument. The difference between an argument of type ``path`` and one of
type ``string`` is that the latter stops capturing when a ``/`` appears in the
URL.
URL::
@app.get('/tests/<path:path>')
async def get_test(request, path):
@@ -462,7 +461,7 @@ the sub-applications to build the larger combined application::
from customers import customers_app
from orders import orders_app
async def create_app():
def create_app():
app = Microdot()
app.mount(customers_app, url_prefix='/customers')
app.mount(orders_app, url_prefix='/orders')

View File

@@ -39,7 +39,7 @@ extension.
Any applications built using the asyncio extension will need to update their
imports from this::
from microdot.asyncio import Microdot
from microdot_asyncio import Microdot
to this::
@@ -94,7 +94,7 @@ as a single string::
Streamed templates also have an asynchronous version::
return await Template('index.html').generate_async(title='Home')
return Template('index.html').generate_async(title='Home')
Class-based user sessions
~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -138,5 +138,8 @@ deployed with standard WSGI servers such as Gunicorn.
WebSocket support when using the WSGI extension is enabled when using a
compatible web server. At this time only Gunicorn is supported for WebSocket.
Given that WebSocket support is asynchronous, it would be better to switch to
the ASGI extension, which has full support for WebSocket as defined in the ASGI
specification.
As before, the WSGI extension is not available under MicroPython.

1
examples/auth/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate basic and token authentication.

View File

@@ -0,0 +1,31 @@
from hashlib import sha1
from microdot import Microdot
from microdot.auth import BasicAuth
def create_hash(password):
return sha1(password).hexdigest()
USERS = {
'susan': create_hash(b'hello'),
'david': create_hash(b'bye'),
}
app = Microdot()
auth = BasicAuth()
@auth.authenticate
async def check_credentials(request, username, password):
if username in USERS and USERS[username] == create_hash(password.encode()):
return username
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,26 @@
from microdot import Microdot
from microdot.auth import TokenAuth
app = Microdot()
auth = TokenAuth()
TOKENS = {
'susan-token': 'susan',
'david-token': 'david',
}
@auth.authenticate
async def check_token(request, token):
if token in TOKENS:
return TOKENS[token]
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -9,9 +9,7 @@ aiofiles==23.2.1
annotated-types==0.6.0
# via pydantic
anyio==3.7.1
# via
# fastapi
# starlette
# via starlette
blinker==1.7.0
# via
# flask
@@ -28,13 +26,13 @@ click==8.1.7
# pip-tools
# quart
# uvicorn
fastapi==0.104.1
fastapi==0.109.1
# via -r requirements.in
flask==3.0.0
# via
# -r requirements.in
# quart
gunicorn==21.2.0
gunicorn==22.0.0
# via -r requirements.in
h11==0.14.0
# via
@@ -51,7 +49,7 @@ hypercorn==0.15.0
# via quart
hyperframe==6.0.1
# via h2
idna==3.6
idna==3.7
# via
# anyio
# requests
@@ -59,7 +57,7 @@ itsdangerous==2.1.2
# via
# flask
# quart
jinja2==3.1.2
jinja2==3.1.3
# via
# flask
# quart
@@ -90,7 +88,7 @@ requests==2.31.0
# via -r requirements.in
sniffio==1.3.0
# via anyio
starlette==0.27.0
starlette==0.35.1
# via fastapi
typing-extensions==4.9.0
# via

1
examples/login/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate user logins.

122
examples/login/login.py Normal file
View File

@@ -0,0 +1,122 @@
from hashlib import sha1
from microdot import Microdot, redirect
from microdot.session import Session
from microdot.login import Login
class User:
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password_hash = self.create_hash(password)
def create_hash(self, password):
# note: to keep this example simple, passwords are hashed with the SHA1
# algorithm. In a real application, you should use a stronger
# algorithm, such as bcrypt.
return sha1(password.encode()).hexdigest()
def check_password(self, password):
return self.create_hash(password) == self.password_hash
USERS = {
'user001': User('user001', 'susan', 'hello'),
'user002': User('user002', 'david', 'bye'),
}
app = Microdot()
Session(app, secret_key='top-secret!')
auth = Login()
@auth.id_to_user
async def get_user(user_id):
print('get_user', user_id)
return USERS.get(user_id)
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
if request.method == 'GET':
return '''
<!doctype html>
<html>
<body>
<h1>Please Login</h1>
<form method="POST">
<p>
Username<br>
<input name="username" autofocus>
</p>
<p>
Password:<br>
<input name="password" type="password">
<br>
</p>
<p>
<input name="remember_me" type="checkbox"> Remember me
<br>
</p>
<p>
<button type="submit">Login</button>
</p>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))
for user in USERS.values():
if user.username == username:
if user.check_password(password):
return await auth.login_user(request, user,
remember=remember_me)
return redirect('/login')
@app.route('/')
@auth
async def index(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>
<a href="/fresh">Click here</a> to access the fresh login page.
</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.get('/fresh')
@auth.fresh
async def fresh(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>This page requires a fresh login session.</p>
<p><a href="/">Go back</a> to the main page.</p>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.post('/logout')
@auth
async def logout(request):
await auth.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,3 +1,7 @@
# This is a simple example that demonstrates how to use the user session, but
# is not intended as a complete login solution. See the login subdirectory for
# a more complete example.
from microdot import Microdot, Response, redirect
from microdot.session import Session, with_session

View File

@@ -1,7 +1,7 @@
from microdot import Microdot, Response
from microdot.jinja import template, init_templates
from microdot.jinja import Template
init_templates('templates', enable_async=True)
Template.initialize('templates', enable_async=True)
app = Microdot()
Response.default_content_type = 'text/html'
@@ -11,7 +11,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return await template('index.html').render_async(name=name)
return await Template('index.html').render_async(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.jinja import template
from microdot.jinja import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -7,12 +7,12 @@ Response.default_content_type = 'text/html'
@app.route('/')
async def index(req):
return template('page1.html').render(page='Page 1')
return Template('page1.html').render(page='Page 1')
@app.route('/page2')
async def page2(req):
return template('page2.html').render(page='Page 2')
return Template('page2.html').render(page='Page 2')
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.jinja import template
from microdot.jinja import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.asgi import Microdot, Response
from microdot.jinja import template
from microdot.jinja import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.wsgi import Microdot, Response
from microdot.jinja import template
from microdot.jinja import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.jinja import template
from microdot.jinja import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').generate(name=name)
return Template('index.html').generate(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return await template('index.html').render_async(name=name)
return await Template('index.html').render_async(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -7,12 +7,12 @@ Response.default_content_type = 'text/html'
@app.route('/')
async def index(req):
return template('page1.html').render(page='Page 1')
return Template('page1.html').render(page='Page 1')
@app.route('/page2')
async def page2(req):
return template('page2.html').render(page='Page 2')
return Template('page2.html').render(page='Page 2')
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.asgi import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.wsgi import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').render(name=name)
return Template('index.html').render(name=name)
if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response
from microdot.utemplate import template
from microdot.utemplate import Template
app = Microdot()
Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return template('index.html').generate(name=name)
return Template('index.html').generate(name=name)
if __name__ == '__main__':

View File

@@ -1,4 +1,5 @@
import ssl
import sys
from microdot import Microdot
app = Microdot()
@@ -31,6 +32,7 @@ async def shutdown(request):
return 'The server is shutting down...'
ext = 'der' if sys.implementation.name == 'micropython' else 'pem'
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain('cert.pem', 'key.pem')
sslctx.load_cert_chain('cert.' + ext, 'key.' + ext)
app.run(port=4443, debug=True, ssl=sslctx)

View File

@@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_ticks`
================================================================================
Work with intervals and deadlines in milliseconds
* Author(s): Jeff Epler
Implementation Notes
--------------------
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
# imports
from micropython import const
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ticks.git"
_TICKS_PERIOD = const(1 << 29)
_TICKS_MAX = const(_TICKS_PERIOD - 1)
_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2)
# Get the correct implementation of ticks_ms. There are three possibilities:
#
# - supervisor.ticks_ms is present. This will be the case starting in CP7.0
#
# - time.ticks_ms is present. This is the case for MicroPython & for the "unix
# port" of CircuitPython, used for some automated testing.
#
# - time.monotonic_ns is present, and works. This is the case on most
# Express boards in CP6.x, and most host computer versions of Python.
#
# - Otherwise, time.monotonic is assumed to be present. This is the case
# on most non-express boards in CP6.x, and some old host computer versions
# of Python.
#
# Note that on microcontrollers, this time source becomes increasingly
# inaccurate when the board has not been reset in a long time, losing the
# ability to measure 1ms intervals after about 1 hour, and losing the
# ability to meausre 128ms intervals after 6 days. The only solution is to
# either upgrade to a version with supervisor.ticks_ms, or to switch to a
# board with time.monotonic_ns.
try:
from supervisor import ticks_ms # pylint: disable=unused-import
except (ImportError, NameError):
import time
if _ticks_ms := getattr(time, "ticks_ms", None):
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return _ticks_ms() & _TICKS_MAX # pylint: disable=not-callable
else:
try:
from time import monotonic_ns as _monotonic_ns
_monotonic_ns() # Check that monotonic_ns is usable
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return (_monotonic_ns() // 1_000_000) & _TICKS_MAX
except (ImportError, NameError, NotImplementedError):
from time import monotonic as _monotonic
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return int(_monotonic() * 1000) & _TICKS_MAX
def ticks_add(ticks: int, delta: int) -> int:
"Add a delta to a base number of ticks, performing wraparound at 2**29ms."
return (ticks + delta) % _TICKS_PERIOD
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values,
assuming that they are within 2**28 ticks"""
diff = (ticks1 - ticks2) & _TICKS_MAX
diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD
return diff
def ticks_less(ticks1: int, ticks2: int) -> bool:
"""Return true if ticks1 is before ticks2 and false otherwise,
assuming that they are within 2**28 ticks"""
return ticks_diff(ticks1, ticks2) < 0

View File

@@ -0,0 +1,41 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
from .core import *
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/Adafruit/Adafruit_CircuitPython_asyncio.git"
_attrs = {
"wait_for": "funcs",
"wait_for_ms": "funcs",
"gather": "funcs",
"Event": "event",
"ThreadSafeFlag": "event",
"Lock": "lock",
"open_connection": "stream",
"start_server": "stream",
"StreamReader": "stream",
"StreamWriter": "stream",
}
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

View File

@@ -0,0 +1,430 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Core
====
"""
from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
try:
from traceback import print_exception
except:
from .traceback import print_exception
# Import TaskQueue and Task, preferring built-in C code over Python code
try:
from _asyncio import TaskQueue, Task
except ImportError:
from .task import TaskQueue, Task
################################################################################
# Exceptions
# Depending on the release of CircuitPython these errors may or may not
# exist in the C implementation of `_asyncio`. However, when they
# do exist, they must be preferred over the Python code.
try:
from _asyncio import CancelledError, InvalidStateError
except (ImportError, AttributeError):
class CancelledError(BaseException):
"""Injected into a task when calling `Task.cancel()`"""
pass
class InvalidStateError(Exception):
"""Can be raised in situations like setting a result value for a task object that already has a result value set."""
pass
class TimeoutError(Exception):
"""Raised when waiting for a task longer than the specified timeout."""
pass
# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
################################################################################
# Sleep functions
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
"""Sleep for *t* milliseconds.
This is a coroutine, and a MicroPython extension.
"""
assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
# Pause task execution for the given time (in seconds)
def sleep(t):
"""Sleep for *t* seconds
This is a coroutine.
"""
return sleep_ms(int(t * 1000))
################################################################################
# "Never schedule" object"
# Don't re-schedule the object that awaits _never().
# For internal use only. Some constructs, like `await event.wait()`,
# work by NOT re-scheduling the task which calls wait(), but by
# having some other task schedule it later.
class _NeverSingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
def _never(sgen=_NeverSingletonGenerator()):
# assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = False
return sgen
################################################################################
# Queue and poller for stream IO
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
def _enqueue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
# Link task to this IOQueue so it can be removed if needed
cur_task.data = self
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
async def queue_read(self, s):
self._enqueue(s, 0)
await _never()
async def queue_write(self, s):
self._enqueue(s, 1)
await _never()
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
################################################################################
# Main run loop
# Ensure the awaitable is a task
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
# Create and schedule a new task from a coroutine
def create_task(coro):
"""Create a new task from the given coroutine and schedule it to run.
Returns the corresponding `Task` object.
"""
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
return t
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
"""Run the given *main_task* until it completes."""
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _task_queue is ready to run
dt = 1
while dt > 0:
dt = -1
t = _task_queue.peek()
if t:
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
exc = t.data
if not exc:
t.coro.send(None)
else:
# If the task is finished and on the run queue and gets here, then it
# had an exception and was not await'ed on. Throwing into it now will
# raise StopIteration and the code below will catch this and run the
# call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
if t.state:
# Task was running but is now finished.
waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push_head(t.state.pop_head())
waiting = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Save return value of coro to pass up to caller.
t.data = er
elif t.state is None:
# Task is already finished and nothing await'ed on the task,
# so call the exception handler.
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
def run(coro):
"""Create a new task from the given coroutine and run it until it completes.
Returns the value returned by *coro*.
"""
return run_until_complete(create_task(coro))
################################################################################
# Event loop wrapper
async def _stopper():
pass
_stop_task = None
class Loop:
"""Class representing the event loop"""
_exc_handler = None
def create_task(coro):
"""Create a task from the given *coro* and return the new `Task` object."""
return create_task(coro)
def run_forever():
"""Run the event loop until `Loop.stop()` is called."""
global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(aw):
"""Run the given *awaitable* until it completes. If *awaitable* is not a task then
it will be promoted to one.
"""
return run_until_complete(_promote_to_task(aw))
def stop():
"""Stop the event loop"""
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None
def close():
"""Close the event loop."""
pass
def set_exception_handler(handler):
"""Set the exception handler to call when a Task raises an exception that is not
caught. The *handler* should accept two arguments: ``(loop, context)``
"""
Loop._exc_handler = handler
def get_exception_handler():
"""Get the current exception handler. Returns the handler, or ``None`` if no
custom handler is set.
"""
return Loop._exc_handler
def default_exception_handler(loop, context):
"""The default exception handler that is called."""
exc = context["exception"]
print_exception(None, exc, exc.__traceback__)
def call_exception_handler(context):
"""Call the current exception handler. The argument *context* is passed through
and is a dictionary containing keys:
``'message'``, ``'exception'``, ``'future'``
"""
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
"""Return the event loop used to schedule and run tasks. See `Loop`."""
return Loop
def current_task():
"""Return the `Task` object associated with the currently running task."""
return cur_task
def new_event_loop():
"""Reset the event loop and return it.
**NOTE**: Since MicroPython only has a single event loop, this function just resets
the loop's state, it does not create a new one
"""
global _task_queue, _io_queue, _exc_context, cur_task
# TaskQueue of Task instances
_task_queue = TaskQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
cur_task = None
_exc_context['exception'] = None
_exc_context['future'] = None
return Loop
# Initialise default event loop
new_event_loop()

View File

@@ -0,0 +1,92 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Events
======
"""
from . import core
# Event class for primitive events that can be waited on, set, and cleared
class Event:
"""Create a new event which can be used to synchronize tasks. Events
start in the cleared state.
"""
def __init__(self):
self.state = False # False=unset; True=set
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""
return self.state
def set(self):
"""Set the event. Any tasks waiting on the event will be scheduled to run.
"""
# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
self.state = True
def clear(self):
"""Clear the event."""
self.state = False
async def wait(self):
"""Wait for the event to be set. If the event is already set then it returns
immediately.
This is a coroutine.
"""
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
await core._never()
return True
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
if not self._flag:
yield core._io_queue.queue_read(self)
self._flag = 0
except ImportError:
pass

View File

@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2022 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Functions
=========
"""
from . import core
async def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)
async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
from it.
If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``:
this should be trapped by the caller.
Returns the return value of *aw*.
This is a coroutine.
"""
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
# Run aw in a separate runner task that manages its exceptions.
runner_task = core.create_task(_run(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
status = er.args[0] if er.args else None
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
runner_task.cancel()
await runner_task
raise core.TimeoutError
def wait_for_ms(aw, timeout):
"""Similar to `wait_for` but *timeout* is an integer in milliseconds.
This is a coroutine, and a MicroPython extension.
"""
return wait_for(aw, timeout, core.sleep_ms)
class _Remove:
@staticmethod
def remove(t):
pass
async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
Returns a list of return values of all *aws*
"""
if not aws:
return []
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done
# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False
# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
await core._never()
except core.CancelledError as er:
cancel_all = True
state = er
# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state
# Return the list of return values of each sub-task.
return ts

View File

@@ -0,0 +1,87 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Locks
=====
"""
from . import core
# Lock class for primitive mutex capability
class Lock:
"""Create a new lock which can be used to coordinate tasks. Locks start in
the unlocked state.
In addition to the methods below, locks can be used in an ``async with``
statement.
"""
def __init__(self):
# The state can take the following values:
# - 0: unlocked
# - 1: locked
# - <Task>: unlocked but this task has been scheduled to acquire the lock next
self.state = 0
# Queue of Tasks waiting to acquire this Lock
self.waiting = core.TaskQueue()
def locked(self):
"""Returns ``True`` if the lock is locked, otherwise ``False``."""
return self.state == 1
def release(self):
"""Release the lock. If any tasks are waiting on the lock then the next
one in the queue is scheduled to run and the lock remains locked. Otherwise,
no tasks are waiting and the lock becomes unlocked.
"""
if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
"""Wait for the lock to be in the unlocked state and then lock it in an
atomic way. Only one task can acquire the lock at any one time.
This is a coroutine.
"""
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
await core._never()
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 1
self.release()
raise er
# Lock available, set it as locked
self.state = 1
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()

View File

@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
),
opt=3,
)

View File

@@ -0,0 +1,263 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Streams
=======
"""
from . import core
class Stream:
"""This represents a TCP stream connection. To minimise code this class
implements both a reader and a writer, and both ``StreamReader`` and
``StreamWriter`` alias to this class.
"""
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b""
def get_extra_info(self, v):
"""Get extra information about the stream, given by *v*. The valid
values for *v* are: ``peername``.
"""
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
"""Wait for the stream to close.
This is a coroutine.
"""
# TODO yield?
self.s.close()
async def read(self, n):
"""Read up to *n* bytes and return them.
This is a coroutine.
"""
await core._io_queue.queue_read(self.s)
return self.s.read(n)
async def readinto(self, buf):
"""Read up to n bytes into *buf* with n being equal to the length of *buf*
Return the number of bytes read into *buf*
This is a coroutine, and a MicroPython extension.
"""
await core._io_queue.queue_read(self.s)
return self.s.readinto(buf)
async def readexactly(self, n):
"""Read exactly *n* bytes and return them as a bytes object.
Raises an ``EOFError`` exception if the stream ends before reading
*n* bytes.
This is a coroutine.
"""
r = b""
while n:
await core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if not len(r2):
raise EOFError
r += r2
n -= len(r2)
return r
async def readline(self):
"""Read a line and return it.
This is a coroutine.
"""
l = b""
while True:
await core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
"""Accumulated *buf* to the output buffer. The data is only flushed when
`Stream.drain` is called. It is recommended to call `Stream.drain`
immediately after calling this function.
"""
if not self.out_buf:
# Try to write immediately to the underlying stream.
ret = self.s.write(buf)
if ret == len(buf):
return
if ret is not None:
buf = buf[ret:]
self.out_buf += buf
async def drain(self):
"""Drain (write) all buffered output data out to the stream.
This is a coroutine.
"""
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
await core._io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b""
# Stream can be used for both reading and writing to save code size
StreamReader = Stream
StreamWriter = Stream
# Create a TCP stream connection to a remote host
async def open_connection(host, port):
"""Open a TCP connection to the given *host* and *port*. The *host* address will
be resolved using `socket.getaddrinfo`, which is currently a blocking call.
Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific
``OSError`` if the host could not be resolved or if the connection could not be made.
This is a coroutine.
"""
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
await core._io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
"""This represents the server class returned from `start_server`. It can be used in
an ``async with`` statement to close the server upon exit.
"""
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
"""Close the server."""
self.task.cancel()
async def wait_closed(self):
"""Wait for the server to close.
This is a coroutine.
"""
await self.task
async def _serve(self, s, cb):
# Accept incoming connections
while True:
try:
await core._io_queue.queue_read(s)
except core.CancelledError:
# Shutdown server
s.close()
return
try:
s2, addr = s.accept()
except:
# Ignore a failed accept
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
"""Start a TCP server on the given *host* and *port*. The *cb* callback will be
called with incoming, accepted connections, and be passed 2 arguments: reader
writer streams for the connection.
Returns a `Server` object.
This is a coroutine.
"""
import usocket as socket
# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.listen(backlog)
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
return srv
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off : off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?

View File

@@ -0,0 +1,215 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Tasks
=====
"""
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
from . import core
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)
def pop(self):
v = self.heap
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Compatibility aliases, remove after they are no longer used
push_head = push
push_sorted = push
pop_head = pop
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
"""This object wraps a coroutine into a running task. Tasks can be waited on
using ``await task``, which will wait for the task to complete and return the
return value of the task.
Tasks should not be created directly, rather use ``create_task`` to create them.
"""
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self
# CircuitPython needs __await()__.
__await__ = __iter__
def __next__(self):
if not self.state:
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
"""Whether the task is complete."""
return not self.state
def cancel(self):
"""Cancel the task by injecting a ``CancelledError`` into it. The task
may or may not ignore this exception.
"""
# Check if task is already finished.
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True

View File

@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
"""
Fallback traceback module if the system traceback is missing.
"""
try:
from typing import List
except ImportError:
pass
import sys
def _print_traceback(traceback, limit=None, file=sys.stderr) -> List[str]:
if limit is None:
if hasattr(sys, "tracebacklimit"):
limit = sys.tracebacklimit
n = 0
while traceback is not None:
frame = traceback.tb_frame
line_number = traceback.tb_lineno
frame_code = frame.f_code
filename = frame_code.co_filename
name = frame_code.co_name
print(' File "%s", line %d, in %s' % (filename, line_number, name), file=file)
traceback = traceback.tb_next
n = n + 1
if limit is not None and n >= limit:
break
def print_exception(exception, value=None, traceback=None, limit=None, file=sys.stderr):
"""
Print exception information and stack trace to file.
"""
if traceback:
print("Traceback (most recent call last):", file=file)
_print_traceback(traceback, limit=limit, file=file)
if isinstance(exception, BaseException):
exception_type = type(exception).__name__
elif hasattr(exception, "__name__"):
exception_type = exception.__name__
else:
exception_type = type(value).__name__
valuestr = str(value)
if value is None or not valuestr:
print(exception_type, file=file)
else:
print("%s: %s" % (str(exception_type), valuestr), file=file)

View File

@@ -13,12 +13,6 @@ class Stream:
def get_extra_info(self, v):
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
@@ -63,6 +57,8 @@ class Stream:
while True:
yield core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
if l2 is None:
continue
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
@@ -100,19 +96,29 @@ StreamWriter = Stream
# Create a TCP stream connection to a remote host
#
# async
def open_connection(host, port):
def open_connection(host, port, ssl=None, server_hostname=None):
from errno import EINPROGRESS
import socket
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
# wrap with SSL, if requested
if ssl:
if ssl is True:
import ssl as _ssl
ssl = _ssl.SSLContext(_ssl.PROTOCOL_TLS_CLIENT)
if not server_hostname:
server_hostname = host
s = ssl.wrap_socket(s, server_hostname=server_hostname, do_handshake_on_connect=False)
s.setblocking(False)
ss = Stream(s)
yield core._io_queue.queue_write(s)
return ss, ss
@@ -135,7 +141,7 @@ class Server:
async def wait_closed(self):
await self.task
async def _serve(self, s, cb):
async def _serve(self, s, cb, ssl):
self.state = False
# Accept incoming connections
while True:
@@ -156,6 +162,13 @@ class Server:
except:
# Ignore a failed accept
continue
if ssl:
try:
s2 = ssl.wrap_socket(s2, server_side=True, do_handshake_on_connect=False)
except OSError as e:
core.sys.print_exception(e)
s2.close()
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
@@ -163,7 +176,7 @@ class Server:
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
async def start_server(cb, host, port, backlog=5, ssl=None):
import socket
# Create and bind server socket.
@@ -176,7 +189,7 @@ async def start_server(cb, host, port, backlog=5):
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
srv.task = core.create_task(srv._serve(s, cb, ssl))
try:
# Ensure that the _serve task has been scheduled so that it gets to
# handle cancellation.

View File

@@ -1,79 +0,0 @@
from utime import *
from micropython import const
_TS_YEAR = const(0)
_TS_MON = const(1)
_TS_MDAY = const(2)
_TS_HOUR = const(3)
_TS_MIN = const(4)
_TS_SEC = const(5)
_TS_WDAY = const(6)
_TS_YDAY = const(7)
_TS_ISDST = const(8)
_WDAY = const(("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"))
_MDAY = const(
(
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
)
)
def strftime(datefmt, ts):
from io import StringIO
fmtsp = False
ftime = StringIO()
for k in datefmt:
if fmtsp:
if k == "a":
ftime.write(_WDAY[ts[_TS_WDAY]][0:3])
elif k == "A":
ftime.write(_WDAY[ts[_TS_WDAY]])
elif k == "b":
ftime.write(_MDAY[ts[_TS_MON] - 1][0:3])
elif k == "B":
ftime.write(_MDAY[ts[_TS_MON] - 1])
elif k == "d":
ftime.write("%02d" % ts[_TS_MDAY])
elif k == "H":
ftime.write("%02d" % ts[_TS_HOUR])
elif k == "I":
ftime.write("%02d" % (ts[_TS_HOUR] % 12))
elif k == "j":
ftime.write("%03d" % ts[_TS_YDAY])
elif k == "m":
ftime.write("%02d" % ts[_TS_MON])
elif k == "M":
ftime.write("%02d" % ts[_TS_MIN])
elif k == "P":
ftime.write("AM" if ts[_TS_HOUR] < 12 else "PM")
elif k == "S":
ftime.write("%02d" % ts[_TS_SEC])
elif k == "w":
ftime.write(str(ts[_TS_WDAY]))
elif k == "y":
ftime.write("%02d" % (ts[_TS_YEAR] % 100))
elif k == "Y":
ftime.write(str(ts[_TS_YEAR]))
else:
ftime.write(k)
fmtsp = False
elif k == "%":
fmtsp = True
else:
ftime.write(k)
val = ftime.getvalue()
ftime.close()
return val

View File

@@ -1,6 +1,6 @@
[project]
name = "microdot"
version = "1.3.5.dev0"
version = "2.0.6.dev0"
authors = [
{ name = "Miguel Grinberg", email = "miguel.grinberg@gmail.com" },
]
@@ -26,6 +26,7 @@ Homepage = "https://github.com/miguelgrinberg/microdot"
[project.optional-dependencies]
docs = [
"sphinx",
"pyjwt"
]
[tool.setuptools]

View File

@@ -2,7 +2,11 @@ import sys
sys.path.insert(0, 'src')
sys.path.insert(2, 'libs/common')
sys.path.insert(3, 'libs/micropython')
if sys.implementation.name == 'circuitpython':
sys.path.insert(3, 'libs/circuitpython')
sys.path.insert(4, 'libs/micropython')
else:
sys.path.insert(3, 'libs/micropython')
import unittest

View File

@@ -45,6 +45,12 @@ class _BodyStream: # pragma: no cover
class Microdot(BaseMicrodot):
"""A subclass of the core :class:`Microdot <microdot.Microdot>` class that
implements the ASGI protocol.
This class must be used as the application instance when running under an
ASGI web server.
"""
def __init__(self):
super().__init__()
self.embedded_server = False

144
src/microdot/auth.py Normal file
View File

@@ -0,0 +1,144 @@
from microdot import abort
from microdot.microdot import invoke_handler
class BaseAuth:
def __init__(self):
self.auth_callback = None
self.error_callback = None
def __call__(self, f):
"""Decorator to protect a route with authentication.
An instance of this class must be used as a decorator on the routes
that need to be protected. Example::
auth = BasicAuth() # or TokenAuth()
@app.route('/protected')
@auth
def protected(request):
# ...
Routes that are decorated in this way will only be invoked if the
authentication callback returned a valid user object, otherwise the
error callback will be executed.
"""
async def wrapper(request, *args, **kwargs):
auth = self._get_auth(request)
if not auth:
return await invoke_handler(self.error_callback, request)
request.g.current_user = await invoke_handler(
self.auth_callback, request, *auth)
if not request.g.current_user:
return await invoke_handler(self.error_callback, request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
"""Basic Authentication.
:param realm: The realm that is displayed when the user is prompted to
authenticate in the browser.
:param charset: The charset that is used to encode the realm.
:param scheme: The authentication scheme. Defaults to 'Basic'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
error_status=401):
super().__init__()
self.realm = realm
self.charset = charset
self.scheme = scheme
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get('Authorization')
if auth and auth.startswith('Basic '):
import binascii
try:
username, password = binascii.a2b_base64(
auth[6:]).decode().split(':', 1)
except Exception: # pragma: no cover
return None
return username, password
def authentication_error(self, request):
return '', self.error_status, {
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
self.scheme, self.realm, self.charset)}
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, username, password):
user = get_user(username)
if user and user.check_password(password):
return get_user(username)
"""
self.auth_callback = f
class TokenAuth(BaseAuth):
"""Token based authentication.
:param header: The name of the header that will contain the token. Defaults
to 'Authorization'.
:param scheme: The authentication scheme. Defaults to 'Bearer'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, header='Authorization', scheme='Bearer',
error_status=401):
super().__init__()
self.header = header
self.scheme = scheme.lower()
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get(self.header)
if auth:
if self.header == 'Authorization':
try:
scheme, token = auth.split(' ', 1)
except Exception:
return None
if scheme.lower() == self.scheme:
return (token.strip(),)
else:
return (auth,)
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, token):
return get_user(token)
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def authentication_error(self, request):
abort(self.error_status)

8
src/microdot/helpers.py Normal file
View File

@@ -0,0 +1,8 @@
try:
from functools import wraps
except ImportError: # pragma: no cover
# MicroPython does not currently implement functools.wraps
def wraps(wrapped):
def _(wrapper):
return wrapper
return _

View File

@@ -3,36 +3,37 @@ from jinja2 import Environment, FileSystemLoader, select_autoescape
_jinja_env = None
def init_templates(template_dir='templates', enable_async=False, **kwargs):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load templates
from a *templates* subdirectory.
:param enable_async: set to ``True`` to enable the async rendering engine
in Jinja, and the ``render_async()`` and
``generate_async()`` methods.
:param kwargs: any additional options to be passed to Jinja's
``Environment`` class.
"""
global _jinja_env
_jinja_env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(),
enable_async=enable_async,
**kwargs
)
class Template:
"""A template object.
:param template: The filename of the template to render, relative to the
configured template directory.
"""
@classmethod
def initialize(cls, template_dir='templates', enable_async=False,
**kwargs):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param enable_async: set to ``True`` to enable the async rendering
engine in Jinja, and the ``render_async()`` and
``generate_async()`` methods.
:param kwargs: any additional options to be passed to Jinja's
``Environment`` class.
"""
global _jinja_env
_jinja_env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(),
enable_async=enable_async,
**kwargs
)
def __init__(self, template):
if _jinja_env is None: # pragma: no cover
init_templates()
self.initialize()
#: The name of the template
self.name = template
self.template = _jinja_env.get_template(template)

150
src/microdot/login.py Normal file
View File

@@ -0,0 +1,150 @@
from time import time
from microdot import redirect
from microdot.microdot import urlencode, invoke_handler
class Login:
def __init__(self, login_url='/login'):
self.login_url = login_url
self.user_callback = None
self.user_id_callback = lambda user: user.id
def id_to_user(self, f):
"""Decorator to configure the user callback.
The decorated function receives the user ID as an argument and must
return the corresponding user object, or ``None`` if the user ID is
invalid.
"""
self.user_callback = f
def user_to_id(self, f):
"""Decorator to configure the user ID callback.
The decorated functon receives the user object as an argument and must
return the corresponding user ID. By default, the ``id`` attribute of
the user object is used.
"""
self.user_id_callback = f
def _get_session(self, request):
return request.app._session.get(request)
def _update_remember_cookie(self, request, days, user_id=None):
remember_payload = request.app._session.encode({
'user_id': user_id,
'days': days,
'exp': time() + days * 24 * 60 * 60
})
@request.after_request
async def _set_remember_cookie(request, response):
response.set_cookie('_remember', remember_payload,
max_age=days * 24 * 60 * 60)
return response
def _get_user_id_from_session(self, request):
session = self._get_session(request)
if session and '_user_id' in session:
return session['_user_id']
if '_remember' in request.cookies:
remember_payload = request.app._session.decode(
request.cookies['_remember'])
user_id = remember_payload.get('user_id')
if user_id: # pragma: no branch
self._update_remember_cookie(
request, remember_payload.get('_days', 30), user_id)
session['_user_id'] = user_id
session['_fresh'] = False
session.save()
return user_id
async def _redirect_to_login(self, request):
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
request.url)}
async def login_user(self, request, user, remember=False,
redirect_url='/'):
"""Log a user in.
:param request: the request object
:param user: the user object
:param remember: if the user's logged in state should be remembered
with a cookie after the session ends. Set to the
number of days the remember cookie should last, or to
``True`` to use a default duration of 30 days.
:param redirect_url: the URL to redirect to after login
This call marks the user as logged in by storing their user ID in the
user session. The application must call this method to log a user in
after their credentials have been validated.
The method returns a redirect response, either to the URL the user
originally intended to visit, or if there is no original URL to the URL
specified by the `redirect_url`.
"""
session = self._get_session(request)
session['_user_id'] = await invoke_handler(self.user_id_callback, user)
session['_fresh'] = True
session.save()
if remember:
days = 30 if remember is True else int(remember)
self._update_remember_cookie(request, days, session['_user_id'])
next_url = request.args.get('next', redirect_url)
if not next_url.startswith('/'):
next_url = redirect_url
return redirect(next_url)
async def logout_user(self, request):
"""Log a user out.
:param request: the request object
This call removes information about the user's log in from the user
session. If a remember cookie exists, it is removed as well.
"""
session = self._get_session(request)
session.pop('_user_id', None)
session.pop('_fresh', None)
session.save()
if '_remember' in request.cookies:
self._update_remember_cookie(request, 0)
def __call__(self, f):
"""Decorator to protect a route with authentication.
If the user is not logged in, Microdot will redirect to the login page
first. The decorated route will only run after successful login by the
user. If the user is already logged in, the route will run immediately.
"""
async def wrapper(request, *args, **kwargs):
user_id = self._get_user_id_from_session(request)
if not user_id:
return await self._redirect_to_login(request)
request.g.current_user = await invoke_handler(
self.user_callback, user_id)
if not request.g.current_user:
return await self._redirect_to_login(request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
def fresh(self, f):
"""Decorator to protect a route with "fresh" authentication.
This decorator prevents the route from running when the login session
is not fresh. A fresh session is a session that has been created from
direct user interaction with the login page, as opposite to a session
that was restored from a "remember me" cookie.
"""
base_wrapper = self.__call__(f)
async def wrapper(request, *args, **kwargs):
session = self._get_session(request)
if session.get('_fresh'):
return await base_wrapper(request, *args, **kwargs)
return await self._redirect_to_login(request)
return wrapper

View File

@@ -8,11 +8,11 @@ servers for MicroPython and standard Python.
import asyncio
import io
import json
import re
import time
try:
from inspect import iscoroutinefunction, iscoroutine
from functools import partial
async def invoke_handler(handler, *args, **kwargs):
"""Invoke a handler and return the result.
@@ -23,7 +23,7 @@ try:
ret = await handler(*args, **kwargs)
else:
ret = await asyncio.get_running_loop().run_in_executor(
None, handler, *args, **kwargs)
None, partial(handler, *args, **kwargs))
return ret
except ImportError: # pragma: no cover
def iscoroutine(coro):
@@ -595,10 +595,10 @@ class Response:
if expires:
if isinstance(expires, str):
http_cookie += '; Expires=' + expires
else:
else: # pragma: no cover
http_cookie += '; Expires=' + time.strftime(
'%a, %d %b %Y %H:%M:%S GMT', expires.timetuple())
if max_age:
if max_age is not None:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
@@ -616,10 +616,10 @@ class Response:
:param cookie: The cookie's name.
:param kwargs: Any cookie opens and flags supported by
``set_cookie()`` except ``expires``.
``set_cookie()`` except ``expires`` and ``max_age``.
"""
self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT',
**kwargs)
max_age=0, **kwargs)
def complete(self):
if isinstance(self.body, bytes) and \
@@ -797,8 +797,9 @@ class Response:
class URLPattern():
def __init__(self, url_pattern):
self.url_pattern = url_pattern
self.pattern = ''
self.args = []
self.segments = []
self.regex = None
pattern = ''
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
@@ -810,42 +811,83 @@ class URLPattern():
else:
type_ = 'string'
name = segment
parser = None
if type_ == 'string':
pattern = '[^/]+'
parser = self._string_segment
pattern += '/([^/]+)'
elif type_ == 'int':
pattern = '-?\\d+'
parser = self._int_segment
pattern += '/(-?\\d+)'
elif type_ == 'path':
pattern = '.+'
use_regex = True
pattern += '/(.+)'
elif type_.startswith('re:'):
pattern = type_[3:]
use_regex = True
pattern += '/({pattern})'.format(pattern=type_[3:])
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
self.segments.append({'parser': parser, 'name': name,
'type': type_})
else:
self.pattern += '/{segment}'.format(segment=segment)
pattern += '/' + segment
self.segments.append({'parser': self._static_segment(segment)})
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
import re
self.regex = re.compile('^' + pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
if self.regex:
g = self.regex.match(path)
if not g:
return
i = 1
for segment in self.segments:
if 'name' not in segment:
continue
value = g.group(i)
if segment['type'] == 'int':
value = int(value)
args[segment['name']] = value
i += 1
else:
if len(path) == 0 or path[0] != '/':
return
path = path[1:]
args = {}
for segment in self.segments:
if path is None:
return
arg, path = segment['parser'](path)
if arg is None:
return
if 'name' in segment:
args[segment['name']] = arg
if path is not None:
return
return args
def _static_segment(self, segment):
def _static(value):
s = value.split('/', 1)
if s[0] == segment:
return '', s[1] if len(s) > 1 else None
return None, None
return _static
def _string_segment(self, value):
s = value.split('/', 1)
if len(s[0]) == 0:
return None, None
return s[0], s[1] if len(s) > 1 else None
def _int_segment(self, value):
s = value.split('/', 1)
try:
return int(s[0]), s[1] if len(s) > 1 else None
except ValueError:
return None, None
class HTTPException(Exception):
def __init__(self, status_code, reason=None):
@@ -1149,7 +1191,7 @@ class Microdot:
Example::
import asyncio
from microdot_asyncio import Microdot
from microdot import Microdot
app = Microdot()
@@ -1226,7 +1268,7 @@ class Microdot:
Example::
from microdot_asyncio import Microdot
from microdot import Microdot
app = Microdot()

View File

@@ -1,7 +1,6 @@
import jwt
from microdot.microdot import invoke_handler
secret_key = None
from microdot.helpers import wraps
class SessionDict(dict):
@@ -57,13 +56,7 @@ class Session:
if session is None:
request.g._session = SessionDict(request, {})
return request.g._session
try:
session = jwt.decode(session, self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
request.g._session = SessionDict(request, {})
else:
request.g._session = SessionDict(request, session)
request.g._session = SessionDict(request, self.decode(session))
return request.g._session
def update(self, request, session):
@@ -89,8 +82,7 @@ class Session:
if not self.secret_key:
raise ValueError('The session secret key is not configured')
encoded_session = jwt.encode(session, self.secret_key,
algorithm='HS256')
encoded_session = self.encode(session)
@request.after_request
def _update_session(request, response):
@@ -121,6 +113,18 @@ class Session:
expires='Thu, 01 Jan 1970 00:00:01 GMT')
return response
def encode(self, payload, secret_key=None):
return jwt.encode(payload, secret_key or self.secret_key,
algorithm='HS256')
def decode(self, session, secret_key=None):
try:
payload = jwt.decode(session, secret_key or self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
return {}
return payload
def with_session(f):
"""Decorator that passes the user session to the route handler.
@@ -134,15 +138,11 @@ def with_session(f):
return 'Hello, World!'
Note that the decorator does not save the session. To update the session,
call the :func:`update_session <microdot.session.update_session>` function.
call the :func:`session.save() <microdot.session.SessionDict.save>` method.
"""
@wraps(f)
async def wrapper(request, *args, **kwargs):
return await invoke_handler(
f, request, request.app._session.get(request), *args, **kwargs)
for attr in ['__name__', '__doc__', '__module__', '__qualname__']:
try:
setattr(wrapper, attr, getattr(f, attr))
except AttributeError: # pragma: no cover
pass
return wrapper

View File

@@ -1,20 +1,40 @@
import asyncio
import json
from microdot.helpers import wraps
class SSE:
"""Server-Sent Events object.
An object of this class is sent to handler functions to manage the SSE
connection.
"""
def __init__(self):
self.event = asyncio.Event()
self.queue = []
async def send(self, data, event=None):
async def send(self, data, event=None, event_id=None):
"""Send an event to the client.
:param data: the data to send. It can be given as a string, bytes, dict
or list. Dictionaries and lists are serialized to JSON.
Any other types are converted to string before sending.
:param event: an optional event name, to send along with the data. If
given, it must be a string.
:param event_id: an optional event id, to send along with the data. If
given, it must be a string.
"""
if isinstance(data, (dict, list)):
data = json.dumps(data)
elif not isinstance(data, str):
data = str(data)
data = f'data: {data}\n\n'
data = json.dumps(data).encode()
elif isinstance(data, str):
data = data.encode()
elif not isinstance(data, bytes):
data = str(data).encode()
data = b'data: ' + data + b'\n\n'
if event_id:
data = b'id: ' + event_id.encode() + b'\n' + data
if event:
data = f'event: {event}\n{data}'
data = b'event: ' + event.encode() + b'\n' + data
self.queue.append(data)
self.event.set()
@@ -30,19 +50,9 @@ def sse_response(request, event_function, *args, **kwargs):
:param args: additional positional arguments to be passed to the response.
:param kwargs: additional keyword arguments to be passed to the response.
Example::
@app.route('/events')
async def events_route(request):
async def events(request, sse):
# send an unnamed event with string data
await sse.send('hello')
# send an unnamed event with JSON data
await sse.send({'foo': 'bar'})
# send a named event
await sse.send('hello', event='greeting')
return sse_response(request, events)
This is a low-level function that can be used to implement a custom SSE
endpoint. In general the :func:`microdot.sse.with_sse` decorator should be
used instead.
"""
sse = SSE()
@@ -85,10 +95,16 @@ def with_sse(f):
@app.route('/events')
@with_sse
async def events(request, sse):
for i in range(10):
await asyncio.sleep(1)
await sse.send(f'{i}')
# send an unnamed event with string data
await sse.send('hello')
# send an unnamed event with JSON data
await sse.send({'foo': 'bar'})
# send a named event
await sse.send('hello', event='greeting')
"""
@wraps(f)
async def sse_handler(request, *args, **kwargs):
return sse_response(request, f, *args, **kwargs)

View File

@@ -77,7 +77,7 @@ class TestClient:
The following example shows how to create a test client for an application
and send a test request::
from microdot_asyncio import Microdot
from microdot import Microdot
app = Microdot()
@@ -141,7 +141,17 @@ class TestClient:
cookie_options = cookie_value.split(';')
delete = False
for option in cookie_options[1:]:
if option.strip().lower().startswith('expires='):
if option.strip().lower().startswith(
'max-age='): # pragma: no cover
_, age = option.strip().split('=', 1)
try:
age = int(age)
except ValueError: # pragma: no cover
age = 0
if age <= 0:
delete = True
break
elif option.strip().lower().startswith('expires='):
_, e = option.strip().split('=', 1)
# this is a very limited parser for cookie expiry
# that only detects a cookie deletion request when
@@ -292,6 +302,8 @@ class TestClient:
async def awrite(self, data):
if self.started:
h = WebSocket._parse_frame_header(data[0:2])
if h[1] not in [WebSocket.TEXT, WebSocket.BINARY]:
return
if h[3] < 0:
data = data[2 - h[3]:]
else:

View File

@@ -3,30 +3,32 @@ from utemplate import recompile
_loader = None
def init_templates(template_dir='templates', loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load templates
from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default is
the ``recompile.Loader`` class, which automatically
recompiles templates when they change.
"""
global _loader
_loader = loader_class(None, template_dir)
class Template:
"""A template object.
:param template: The filename of the template to render, relative to the
configured template directory.
"""
@classmethod
def initialize(cls, template_dir='templates',
loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default
is the ``recompile.Loader`` class, which
automatically recompiles templates when they
change.
"""
global _loader
_loader = loader_class(None, template_dir)
def __init__(self, template):
if _loader is None: # pragma: no cover
init_templates()
self.initialize()
#: The name of the template
self.name = template
self.template = _loader.load(template)

View File

@@ -1,10 +1,21 @@
import binascii
import hashlib
from microdot import Response
from microdot.microdot import MUTED_SOCKET_ERRORS
from microdot import Request, Response
from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception
from microdot.helpers import wraps
class WebSocketError(Exception):
"""Exception raised when an error occurs in a WebSocket connection."""
pass
class WebSocket:
"""A WebSocket connection object.
An instance of this class is sent to handler functions to manage the
WebSocket connection.
"""
CONT = 0
TEXT = 1
BINARY = 2
@@ -12,6 +23,18 @@ class WebSocket:
PING = 9
PONG = 10
#: Specify the maximum message size that can be received when calling the
#: ``receive()`` method. Messages with payloads that are larger than this
#: size will be rejected and the connection closed. Set to 0 to disable
#: the size check (be aware of potential security issues if you do this),
#: or to -1 to use the value set in
#: ``Request.max_body_length``. The default is -1.
#:
#: Example::
#:
#: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages
max_message_length = -1
def __init__(self, request):
self.request = request
self.closed = False
@@ -26,6 +49,7 @@ class WebSocket:
b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
async def receive(self):
"""Receive a message from the client."""
while True:
opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
@@ -35,12 +59,20 @@ class WebSocket:
return data
async def send(self, data, opcode=None):
"""Send a message to the client.
:param data: the data to send, given as a string or bytes.
:param opcode: a custom frame opcode to use. If not given, the opcode
is ``TEXT`` or ``BINARY`` depending on the type of the
data.
"""
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
data)
await self.request.sock[1].awrite(frame)
async def close(self):
"""Close the websocket connection."""
if not self.closed: # pragma: no cover
self.closed = True
await self.send(b'', self.CLOSE)
@@ -72,7 +104,7 @@ class WebSocket:
fin = header[0] & 0x80
opcode = header[0] & 0x0f
if fin == 0 or opcode == cls.CONT: # pragma: no cover
raise OSError(32, 'Continuation frames not supported')
raise WebSocketError('Continuation frames not supported')
has_mask = header[1] & 0x80
length = header[1] & 0x7f
if length == 126:
@@ -87,7 +119,7 @@ class WebSocket:
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
raise OSError(32, 'Websocket connection closed')
raise WebSocketError('Websocket connection closed')
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
@@ -114,7 +146,7 @@ class WebSocket:
async def _read_frame(self):
header = await self.request.sock[0].read(2)
if len(header) != 2: # pragma: no cover
raise OSError(32, 'Websocket connection closed')
raise WebSocketError('Websocket connection closed')
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == -2:
length = await self.request.sock[0].read(2)
@@ -122,6 +154,10 @@ class WebSocket:
elif length == -8:
length = await self.request.sock[0].read(8)
length = int.from_bytes(length, 'big')
max_allowed_length = Request.max_body_length \
if self.max_message_length == -1 else self.max_message_length
if length > max_allowed_length:
raise WebSocketError('Message too large')
if has_mask: # pragma: no cover
mask = await self.request.sock[0].read(4)
payload = await self.request.sock[0].read(length)
@@ -157,15 +193,24 @@ async def websocket_upgrade(request):
def websocket_wrapper(f, upgrade_function):
@wraps(f)
async def wrapper(request, *args, **kwargs):
ws = await upgrade_function(request)
try:
await f(request, ws, *args, **kwargs)
await ws.close() # pragma: no cover
except OSError as exc:
if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover
raise
return ''
except WebSocketError:
pass
except Exception as exc:
print_exception(exc)
finally: # pragma: no cover
try:
await ws.close()
except Exception:
pass
return Response.already_handled
return wrapper

View File

@@ -9,6 +9,12 @@ from microdot.websocket import WebSocket, websocket_upgrade, \
class Microdot(BaseMicrodot):
"""A subclass of the core :class:`Microdot <microdot.Microdot>` class that
implements the WSGI protocol.
This class must be used as the application instance when running under a
WSGI web server.
"""
def __init__(self):
super().__init__()
self.loop = asyncio.new_event_loop()

View File

@@ -9,3 +9,4 @@ from tests.test_sse import * # noqa: F401, F403
from tests.test_cors import * # noqa: F401, F403
from tests.test_utemplate import * # noqa: F401, F403
from tests.test_session import * # noqa: F401, F403
from tests.test_auth import * # noqa: F401, F403

125
tests/test_auth.py Normal file
View File

@@ -0,0 +1,125 @@
import asyncio
import binascii
import unittest
from microdot import Microdot
from microdot.auth import BasicAuth, TokenAuth
from microdot.test_client import TestClient
class TestAuth(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.authenticate
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
return {'username': username}
@app.route('/')
@basic_auth
def index(request):
return request.g.current_user['username']
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()}))
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'X-Token-Auth': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler(request):
return {'status_code': 403}, 403
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

View File

@@ -2,10 +2,10 @@ import asyncio
import sys
import unittest
from microdot import Microdot
from microdot.jinja import Template, init_templates
from microdot.jinja import Template
from microdot.test_client import TestClient
init_templates('tests/templates')
Template.initialize('tests/templates')
@unittest.skipIf(sys.implementation.name == 'micropython',
@@ -49,7 +49,7 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.body, b'Hello, foo!')
def test_render_async_template_in_app(self):
init_templates('tests/templates', enable_async=True)
Template.initialize('tests/templates', enable_async=True)
app = Microdot()
@@ -62,10 +62,10 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.status_code, 200)
self.assertEqual(res.body, b'Hello, foo!')
init_templates('tests/templates')
Template.initialize('tests/templates')
def test_generate_async_template_in_app(self):
init_templates('tests/templates', enable_async=True)
Template.initialize('tests/templates', enable_async=True)
app = Microdot()
@@ -78,4 +78,4 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.status_code, 200)
self.assertEqual(res.body, b'Hello, foo!')
init_templates('tests/templates')
Template.initialize('tests/templates')

185
tests/test_login.py Normal file
View File

@@ -0,0 +1,185 @@
import asyncio
import unittest
from microdot import Microdot
from microdot.login import Login
from microdot.session import Session
from microdot.test_client import TestClient
class TestLogin(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_login(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
class User:
def __init__(self, id, name):
self.id = id
self.name = name
@login.id_to_user
def id_to_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return request.g.current_user.name
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(123, 'user123'))
@app.post('/logout')
async def logout_route(request):
await login.logout_user(request)
return 'ok'
client = TestClient(app)
res = self._run(client.get('/?foo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 1)
self.assertIn('session', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user123')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)
def test_login_bad_user_id(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return None
@login.user_to_id
def user_to_id(user):
return user
@app.get('/foo')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user')
client = TestClient(app)
res = self._run(client.post('/login?next=/'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/foo')
def test_login_bad_redirect(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return user_id
@login.user_to_id
def user_to_id(user):
return user
@app.get('/')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user')
client = TestClient(app)
res = self._run(client.post('/login?next=http://example.com'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
def test_login_remember(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return user_id
@login.user_to_id
def user_to_id(user):
return user
@app.get('/')
@login
def index(request):
return request.g.current_user
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user', remember=True)
@app.post('/logout')
async def logout(request):
await login.logout_user(request)
return 'ok'
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'fresh {request.g.current_user}'
client = TestClient(app)
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 2)
self.assertIn('session', client.cookies)
self.assertIn('_remember', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'fresh user')
del client.cookies['session']
print(client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
self.assertFalse('_remember' in client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)

View File

@@ -25,6 +25,14 @@ class TestMicrodot(unittest.TestCase):
async def index2(req):
return 'foo-async'
@app.route('/arg/<id>')
def index3(req, id):
return id
@app.route('/arg/async/<id>')
async def index4(req, id):
return f'async-{id}'
client = TestClient(app)
res = self._run(client.get('/'))
@@ -45,6 +53,24 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.body, b'foo-async')
self.assertEqual(res.json, None)
res = self._run(client.get('/arg/123'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Content-Length'], '3')
self.assertEqual(res.text, '123')
self.assertEqual(res.body, b'123')
self.assertEqual(res.json, None)
res = self._run(client.get('/arg/async/123'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Content-Length'], '9')
self.assertEqual(res.text, 'async-123')
self.assertEqual(res.body, b'async-123')
self.assertEqual(res.json, None)
def test_post_request(self):
app = Microdot()

View File

@@ -1,5 +1,4 @@
import asyncio
from datetime import datetime
import unittest
from microdot import Response
from tests.mock_socket import FakeStreamAsync
@@ -186,12 +185,12 @@ class TestResponse(unittest.TestCase):
res.set_cookie('foo2', 'bar2', path='/', partitioned=True)
res.set_cookie('foo3', 'bar3', domain='example.com:1234')
res.set_cookie('foo4', 'bar4',
expires=datetime(2019, 11, 5, 2, 23, 54))
expires='Tue, 05 Nov 2019 02:23:54 GMT')
res.set_cookie('foo5', 'bar5', max_age=123,
expires='Thu, 01 Jan 1970 00:00:00 GMT')
res.set_cookie('foo6', 'bar6', secure=True, http_only=True)
res.set_cookie('foo7', 'bar7', path='/foo', domain='example.com:1234',
expires=datetime(2019, 11, 5, 2, 23, 54), max_age=123,
expires='Tue, 05 Nov 2019 02:23:54 GMT', max_age=123,
secure=True, http_only=True)
res.delete_cookie('foo8', http_only=True)
self.assertEqual(res.headers, {'Set-Cookie': [
@@ -204,7 +203,8 @@ class TestResponse(unittest.TestCase):
'foo7=bar7; Path=/foo; Domain=example.com:1234; '
'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; '
'HttpOnly',
'foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; HttpOnly',
('foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; Max-Age=0; '
'HttpOnly'),
]})
def test_redirect(self):

View File

@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
@app.post('/set')
@with_session
async def save_session(req, session):
def save_session(req, session):
session['name'] = 'joe'
session.save()
return 'OK'

View File

@@ -23,9 +23,12 @@ class TestWebSocket(unittest.TestCase):
async def handle_sse(request, sse):
await sse.send('foo')
await sse.send('bar', event='test')
await sse.send('bar', event='test', event_id='id42')
await sse.send('bar', event_id='id42')
await sse.send({'foo': 'bar'})
await sse.send([42, 'foo', 'bar'])
await sse.send(ValueError('foo'))
await sse.send(b'foo')
client = TestClient(app)
response = self._run(client.get('/sse'))
@@ -33,6 +36,9 @@ class TestWebSocket(unittest.TestCase):
self.assertEqual(response.headers['Content-Type'], 'text/event-stream')
self.assertEqual(response.text, ('data: foo\n\n'
'event: test\ndata: bar\n\n'
'event: test\nid: id42\ndata: bar\n\n'
'id: id42\ndata: bar\n\n'
'data: {"foo": "bar"}\n\n'
'data: [42, "foo", "bar"]\n\n'
'data: foo\n\n'
'data: foo\n\n'))

View File

@@ -7,11 +7,14 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/')
self.assertEqual(p.match('/'), {})
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('foo'))
self.assertIsNone(p.match(''))
p = URLPattern('/foo/bar')
self.assertEqual(p.match('/foo/bar'), {})
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar/'))
self.assertIsNone(p.match('/foo/bar/baz'))
p = URLPattern('/foo//bar/baz/')
self.assertEqual(p.match('/foo//bar/baz/'), {})
@@ -23,32 +26,50 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/<arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('//'))
self.assertIsNone(p.match(''))
self.assertIsNone(p.match('foo/'))
self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('//foo/'))
self.assertIsNone(p.match('/foo//'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo//bar'))
p = URLPattern('/<arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/<string:arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/<string:arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/foo/<arg1>/bar/<arg2>')
self.assertEqual(p.match('/foo/one/bar/two'),
{'arg1': 'one', 'arg2': 'two'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo//bar/'))
self.assertIsNone(p.match('/foo//bar//'))
def test_int_argument(self):
p = URLPattern('/users/<int:id>')
self.assertEqual(p.match('/users/123'), {'id': 123})
self.assertEqual(p.match('/users/-123'), {'id': -123})
self.assertEqual(p.match('/users/0'), {'id': 0})
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/abc'))
self.assertIsNone(p.match('/users/123abc'))
@@ -82,7 +103,10 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/users/<re:[a-c]+:id>')
self.assertEqual(p.match('/users/ab'), {'id': 'ab'})
self.assertEqual(p.match('/users/bca'), {'id': 'bca'})
self.assertIsNone(p.match('/users'))
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/abcd'))
self.assertIsNone(p.match('/users/abcdx'))
def test_many_arguments(self):
p = URLPattern('/foo/<path:path>/<int:id>/bar/<name>')

View File

@@ -2,9 +2,9 @@ import asyncio
import unittest
from microdot import Microdot
from microdot.test_client import TestClient
from microdot.utemplate import Template, init_templates
from microdot.utemplate import Template
init_templates('tests/templates')
Template.initialize('tests/templates')
class TestUTemplate(unittest.TestCase):

View File

@@ -1,8 +1,8 @@
import asyncio
import sys
import unittest
from microdot import Microdot
from microdot.websocket import with_websocket, WebSocket
from microdot import Microdot, Request
from microdot.websocket import with_websocket, WebSocket, WebSocketError
from microdot.test_client import TestClient
@@ -17,6 +17,7 @@ class TestWebSocket(unittest.TestCase):
return self.loop.run_until_complete(coro)
def test_websocket_echo(self):
WebSocket.max_message_length = 65537
app = Microdot()
@app.route('/echo')
@@ -26,34 +27,10 @@ class TestWebSocket(unittest.TestCase):
data = await ws.receive()
await ws.send(data)
results = []
def ws():
data = yield 'hello'
results.append(data)
data = yield b'bye'
results.append(data)
data = yield b'*' * 300
results.append(data)
data = yield b'+' * 65537
results.append(data)
client = TestClient(app)
res = self._run(client.websocket('/echo', ws))
self.assertIsNone(res)
self.assertEqual(results, ['hello', b'bye', b'*' * 300, b'+' * 65537])
@unittest.skipIf(sys.implementation.name == 'micropython',
'no support for async generators in MicroPython')
def test_websocket_echo_async_client(self):
app = Microdot()
@app.route('/echo')
@app.route('/divzero')
@with_websocket
async def index(req, ws):
while True:
data = await ws.receive()
await ws.send(data)
async def divzero(req, ws):
1 / 0
results = []
@@ -72,6 +49,35 @@ class TestWebSocket(unittest.TestCase):
self.assertIsNone(res)
self.assertEqual(results, ['hello', b'bye', b'*' * 300, b'+' * 65537])
res = self._run(client.websocket('/divzero', ws))
self.assertIsNone(res)
WebSocket.max_message_length = -1
@unittest.skipIf(sys.implementation.name == 'micropython',
'no support for async generators in MicroPython')
def test_websocket_large_message(self):
saved_max_body_length = Request.max_body_length
Request.max_body_length = 10
app = Microdot()
@app.route('/echo')
@with_websocket
async def index(req, ws):
data = await ws.receive()
await ws.send(data)
results = []
async def ws():
data = yield '0123456789abcdef'
results.append(data)
client = TestClient(app)
res = self._run(client.websocket('/echo', ws))
self.assertIsNone(res)
self.assertEqual(results, [])
Request.max_body_length = saved_max_body_length
def test_bad_websocket_request(self):
app = Microdot()
@@ -106,7 +112,7 @@ class TestWebSocket(unittest.TestCase):
(None, 'foo'))
self.assertEqual(ws._process_websocket_frame(WebSocket.BINARY, b'foo'),
(None, b'foo'))
self.assertRaises(OSError, ws._process_websocket_frame,
self.assertRaises(WebSocketError, ws._process_websocket_frame,
WebSocket.CLOSE, b'')
self.assertEqual(ws._process_websocket_frame(WebSocket.PING, b'foo'),
(WebSocket.PONG, b'foo'))

View File

@@ -1,23 +1,24 @@
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
ARG VERSION=master
ENV VERSION=$VERSION
RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python3 && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/micropython/micropython.git && \
cd micropython && \
git checkout $VERSION && \
git submodule update --init && \
cd mpy-cross && \
make && \
cd .. && \
cd ports/unix && \
make && \
make test && \
make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python3 && \
cd ../../.. && \
rm -rf micropython
CMD ["/usr/local/bin/micropython"]

View File

@@ -0,0 +1,24 @@
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
ARG VERSION=main
ENV VERSION=$VERSION
RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python3 && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/adafruit/circuitpython.git && \
cd circuitpython && \
git checkout $VERSION && \
git submodule update --init lib tools frozen && \
cd mpy-cross && \
make && \
cd .. && \
cd ports/unix && \
make && \
make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python3 && \
cd ../../.. && \
rm -rf circuitpython
CMD ["/usr/local/bin/micropython"]

11
tools/update-circuitpython.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
# this script updates the micropython binary in the /bin directory that is
# used to run unit tests under GitHub Actions builds
DOCKER=${DOCKER:-docker}
VERSION=${1:-main}
$DOCKER build -f Dockerfile.circuitpython --build-arg VERSION=$VERSION -t circuitpython .
$DOCKER create -t --name dummy-circuitpython circuitpython
$DOCKER cp dummy-circuitpython:/usr/local/bin/micropython ../bin/circuitpython
$DOCKER rm dummy-circuitpython

View File

@@ -3,8 +3,9 @@
# used to run unit tests under GitHub Actions builds
DOCKER=${DOCKER:-docker}
VERSION=${1:-master}
$DOCKER build -t micropython .
$DOCKER build --build-arg VERSION=$VERSION -t micropython .
$DOCKER create -it --name dummy-micropython micropython
$DOCKER cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
$DOCKER rm dummy-micropython

17
tox.ini
View File

@@ -1,5 +1,5 @@
[tox]
envlist=flake8,py38,py39,py310,py311,py312,upy,benchmark
envlist=flake8,py38,py39,py310,py311,py312,upy,cpy,benchmark,docs
skipsdist=True
skip_missing_interpreters=True
@@ -29,10 +29,13 @@ setenv=
allowlist_externals=sh
commands=sh -c "bin/micropython run_tests.py"
[testenv:cpy]
allowlist_externals=sh
commands=sh -c "bin/circuitpython run_tests.py"
[testenv:upy-mac]
allowlist_externals=micropython
commands=micropython run_tests.py
deps=
[testenv:benchmark]
deps=
@@ -55,3 +58,13 @@ deps=
flake8
commands=
flake8 --ignore=W503 --exclude examples/templates/utemplate/templates src tests examples
[testenv:docs]
changedir=docs
deps=
sphinx
pyjwt
allowlist_externals=
make
commands=
make html