57 Commits

Author SHA1 Message Date
Miguel Grinberg
32f5e415e7 Release 2.0.7 2024-11-10 22:57:31 +00:00
Miguel Grinberg
c46e429106 Accept responses with just a status code (Fixes #263) 2024-11-10 20:09:05 +00:00
Miguel Grinberg
4eac013087 Accept responses with just a status code (Fixes #263) 2024-11-10 00:35:21 +00:00
dependabot[bot]
496a288064 Bump werkzeug from 3.0.3 to 3.0.6 in /examples/benchmark (#260) #nolog
Bumps [werkzeug](https://github.com/pallets/werkzeug) from 3.0.3 to 3.0.6.
- [Release notes](https://github.com/pallets/werkzeug/releases)
- [Changelog](https://github.com/pallets/werkzeug/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/werkzeug/compare/3.0.3...3.0.6)

---
updated-dependencies:
- dependency-name: werkzeug
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-10-26 11:37:08 +01:00
dependabot[bot]
bcd876fcae Bump quart from 0.19.4 to 0.19.7 in /examples/benchmark (#259) #nolog
Bumps [quart](https://github.com/pallets/quart) from 0.19.4 to 0.19.7.
- [Release notes](https://github.com/pallets/quart/releases)
- [Changelog](https://github.com/pallets/quart/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/quart/compare/0.19.4...0.19.7)

---
updated-dependencies:
- dependency-name: quart
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-10-26 00:57:50 +01:00
Stanislav Garanzha
5e5fc5e93e Fix urls in docs (#253)
* Fix 404 external links in intro.rst

* Fix broken links in extensions.rst

`examples/cors/cors.py` does not exist

`uvicorn.org` - failed to resolve
2024-08-17 18:41:43 +01:00
Miguel Grinberg
8895af3737 add tox to dev dependencies 2024-08-15 20:40:54 +01:00
Miguel Grinberg
0a021462e0 Better documentation for start_server() method (Fixes #252) 2024-08-15 19:10:34 +01:00
Lukas Kremla
482ab6d5ca Fixed gzip automatic content-type assignment and added automatic compression header configuration (#251)
* Fixed gzip automatic content-type assignment and added automatic compression setting

This implements the fix for detecting the proper content-type even when the file has the ".gz" extension. It further makes sure the compression headers are set properly if a "gz." file is detected, but the compression headers weren't explicitly set by the user.

* Added a test for properly auto-determining mime types and setting content encoding header

* Modified the gzip file header assignments and following tests according to the feedback.

---------

Co-authored-by: Lukáš Kremla <lukas.kremla@bonnel.cz>
2024-08-14 23:02:23 +01:00
dependabot[bot]
5fe06f6bd5 Bump certifi from 2023.11.17 to 2024.7.4 in /examples/benchmark (#244)
Bumps [certifi](https://github.com/certifi/python-certifi) from 2023.11.17 to 2024.7.4.
- [Commits](https://github.com/certifi/python-certifi/compare/2023.11.17...2024.07.04)

---
updated-dependencies:
- dependency-name: certifi
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-07-06 11:17:41 +01:00
dependabot[bot]
c170e840ec Bump urllib3 from 2.1.0 to 2.2.2 in /examples/benchmark (#241) #nolog
Bumps [urllib3](https://github.com/urllib3/urllib3) from 2.1.0 to 2.2.2.
- [Release notes](https://github.com/urllib3/urllib3/releases)
- [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst)
- [Commits](https://github.com/urllib3/urllib3/compare/2.1.0...2.2.2)

---
updated-dependencies:
- dependency-name: urllib3
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-06-19 00:12:28 +01:00
Miguel Grinberg
3a39b47ea8 Version 2.0.7.dev0 2024-06-18 23:14:36 +01:00
Miguel Grinberg
53287217ae Release 2.0.6 2024-06-18 23:14:14 +01:00
Miguel Grinberg
6ffb8a8fe9 Cookie path support in session and test client 2024-06-18 20:56:18 +01:00
Miguel Grinberg
0151611fc8 Configurable session cookie options (Fixes #242) 2024-06-18 00:09:44 +01:00
dependabot[bot]
4204db61e5 Bump jinja2 from 3.1.3 to 3.1.4 in /examples/benchmark (#230) #nolog
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.3 to 3.1.4.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/jinja/compare/3.1.3...3.1.4)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-05-21 11:40:13 +01:00
dependabot[bot]
12438743a8 Bump werkzeug from 3.0.1 to 3.0.3 in /examples/benchmark (#229) #nolog
Bumps [werkzeug](https://github.com/pallets/werkzeug) from 3.0.1 to 3.0.3.
- [Release notes](https://github.com/pallets/werkzeug/releases)
- [Changelog](https://github.com/pallets/werkzeug/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/werkzeug/compare/3.0.1...3.0.3)

---
updated-dependencies:
- dependency-name: werkzeug
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-05-21 11:39:51 +01:00
dependabot[bot]
7cbb1edf59 Bump requests from 2.31.0 to 2.32.0 in /examples/benchmark (#232) #nolog
updated-dependencies:
- dependency-name: requests
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-05-21 11:39:16 +01:00
Miguel Grinberg
dac6df7a7a use codecov token for coverage uploads #nolog 2024-04-28 00:31:53 +01:00
dependabot[bot]
5d6e838f3c Bump gunicorn from 21.2.0 to 22.0.0 in /examples/benchmark (#224) #nolog
Bumps [gunicorn](https://github.com/benoitc/gunicorn) from 21.2.0 to 22.0.0.
- [Release notes](https://github.com/benoitc/gunicorn/releases)
- [Commits](https://github.com/benoitc/gunicorn/compare/21.2.0...22.0.0)

---
updated-dependencies:
- dependency-name: gunicorn
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-17 23:52:37 +01:00
dependabot[bot]
563bfdc8f5 Bump idna from 3.6 to 3.7 in /examples/benchmark (#223) #nolog
Bumps [idna](https://github.com/kjd/idna) from 3.6 to 3.7.
- [Release notes](https://github.com/kjd/idna/releases)
- [Changelog](https://github.com/kjd/idna/blob/master/HISTORY.rst)
- [Commits](https://github.com/kjd/idna/compare/v3.6...v3.7)

---
updated-dependencies:
- dependency-name: idna
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-12 15:16:52 +01:00
Miguel Grinberg
679d8e63b8 Fix docs build #nolog 2024-03-24 19:56:43 +00:00
Miguel Grinberg
4cb155ee41 Improved cookie support in the test client 2024-03-24 19:45:22 +00:00
Miguel Grinberg
dea79c5ce2 Make Session class more reusable 2024-03-23 16:29:36 +00:00
Carlo Colombo
6b1fd61917 removed outdated import from documentation (Fixes #216) 2024-03-15 11:03:10 +00:00
Miguel Grinberg
f6876c0d15 Use @wraps on decorated functions 2024-03-14 00:16:35 +00:00
Hamsanger
904d5fcaa2 Add event ID to the SSE implementation (#213) 2024-03-10 23:52:43 +00:00
Miguel Grinberg
a0ea439def Add roadmap details to readme 2024-03-10 17:15:14 +00:00
Miguel Grinberg
a1801d9a53 Version 2.0.6.dev0 2024-03-09 10:48:18 +00:00
Miguel Grinberg
14f2c9d345 Release 2.0.5 2024-03-09 10:48:09 +00:00
Miguel Grinberg
d0a4cf8fa7 Handle 0 as an integer argument (Fixes #212) 2024-03-06 20:34:00 +00:00
Miguel Grinberg
901f4e55b8 Version 2.0.5.dev0 2024-02-20 23:15:01 +00:00
Miguel Grinberg
53b28f9938 Release 2.0.4 2024-02-20 23:12:31 +00:00
Miguel Grinberg
f6cba2c0f7 More URLPattern unit tests 2024-02-20 23:09:24 +00:00
Miguel Grinberg
38262c56d3 Do not use regexes for parsing simple URLs (Fixes #207) 2024-02-18 15:05:41 +00:00
dependabot[bot]
a3363c7b8c Bump fastapi from 0.104.1 to 0.109.1 in /examples/benchmark (#203) #nolog
Bumps [fastapi](https://github.com/tiangolo/fastapi) from 0.104.1 to 0.109.1.
- [Release notes](https://github.com/tiangolo/fastapi/releases)
- [Commits](https://github.com/tiangolo/fastapi/compare/0.104.1...0.109.1)

---
updated-dependencies:
- dependency-name: fastapi
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-02-05 17:43:31 +00:00
Miguel Grinberg
e44c271bae Circuitpython build 2024-01-31 23:43:17 +00:00
Miguel Grinberg
bf519478cb Added documentation on using alternative utemplate loaders 2024-01-14 12:47:28 +00:00
dependabot[bot]
8d1ca808cb Bump jinja2 from 3.1.2 to 3.1.3 in /examples/benchmark (#199) #nolog
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.2 to 3.1.3.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/jinja/compare/3.1.2...3.1.3)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-01-11 22:56:46 +00:00
Miguel Grinberg
1f804f869c Version 2.0.4.dev0 2024-01-07 10:50:36 +00:00
Miguel Grinberg
7a6026006f Release 2.0.3 2024-01-07 10:50:28 +00:00
Miguel Grinberg
6712c47400 Pass keyword arguments to thread executor in the correct way (Fixes #195) 2024-01-07 10:44:16 +00:00
Miguel Grinberg
c8c91e8345 Update uasyncio to include new TLS support 2024-01-04 20:41:05 +00:00
Miguel Grinberg
5d188e8c0d Add a limit to WebSocket message size (Fixes #193) 2024-01-03 00:04:02 +00:00
Miguel Grinberg
b80b6b64d0 Documentation improvements 2023-12-28 12:59:19 +00:00
Miguel Grinberg
28007ea583 Version 2.0.3.dev0 2023-12-28 12:11:32 +00:00
Miguel Grinberg
300f8563ed Release 2.0.2 2023-12-28 12:10:46 +00:00
Miguel Grinberg
1fc11193da Support binary data in the SSE extension 2023-12-28 12:04:17 +00:00
Miguel Grinberg
79452a4699 Upgrade micropython tests to use v1.22, initial circuitpython work 2023-12-27 20:39:20 +00:00
Miguel Grinberg
84842e39c3 Improvements to migration guide 2023-12-26 20:00:07 +00:00
Miguel Grinberg
2a3c889717 typo in documentation #nolog 2023-12-26 17:07:20 +00:00
Tak Tran
ad368be993 Remove spurious async in documentation example (#187) 2023-12-23 14:08:12 +00:00
Miguel Grinberg
3df56c6ffe Version 2.0.2.dev0 2023-12-23 12:50:03 +00:00
Miguel Grinberg
c2e18004f7 Release 2.0.1 2023-12-23 12:49:52 +00:00
Miguel Grinberg
bd18ceb442 Addressed some inadvertent mistakes in the template extensions 2023-12-23 12:48:27 +00:00
Miguel Grinberg
f38d6d760a Updated readme and change log #nolog 2023-12-23 00:04:22 +00:00
Miguel Grinberg
dee4914bdd Version 2.0.1.dev0 2023-12-22 20:42:53 +00:00
62 changed files with 2345 additions and 365 deletions

View File

@@ -16,6 +16,7 @@ jobs:
- run: python -m pip install --upgrade pip wheel - run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions - run: pip install tox tox-gh-actions
- run: tox -eflake8 - run: tox -eflake8
- run: tox -edocs
tests: tests:
name: tests name: tests
strategy: strategy:
@@ -41,6 +42,15 @@ jobs:
- run: python -m pip install --upgrade pip wheel - run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions - run: pip install tox tox-gh-actions
- run: tox -eupy - run: tox -eupy
tests-circuitpython:
name: tests-circuitpython
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -ecpy
coverage: coverage:
name: coverage name: coverage
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -54,6 +64,7 @@ jobs:
with: with:
files: ./coverage.xml files: ./coverage.xml
fail_ci_if_error: true fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
benchmark: benchmark:
name: benchmark name: benchmark
runs-on: ubuntu-latest runs-on: ubuntu-latest

4
.gitignore vendored
View File

@@ -25,6 +25,8 @@ wheels/
.installed.cfg .installed.cfg
*.egg *.egg
MANIFEST MANIFEST
requirements.txt
requirements-dev.txt
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template
@@ -90,6 +92,8 @@ venv/
ENV/ ENV/
env.bak/ env.bak/
venv.bak/ venv.bak/
.direnv
.envrc
# Spyder project settings # Spyder project settings
.spyderproject .spyderproject

View File

@@ -1,8 +1,61 @@
# Microdot change log # Microdot change log
**Release 2.0.7** - 2024-11-10
- Accept responses with just a status code [#263](https://github.com/miguelgrinberg/microdot/issues/263) ([commit #1](https://github.com/miguelgrinberg/microdot/commit/4eac013087f807cafa244b8a6b7b0ed4c82ff150) [commit #2](https://github.com/miguelgrinberg/microdot/commit/c46e4291061046f1be13f300dd08645b71c16635))
- Fixed compressed file content-type assignment [#251](https://github.com/miguelgrinberg/microdot/issues/251) ([commit](https://github.com/miguelgrinberg/microdot/commit/482ab6d5ca068d71ea6301f45918946161e9fcc1)) (thanks **Lukas Kremla**!)
- Better documentation for start_server[#252](https://github.com/miguelgrinberg/microdot/issues/252) ([commit](https://github.com/miguelgrinberg/microdot/commit/0a021462e0c42c249d587a2d600f5a21a408adfc))
- Fix URLs in documentation [#253](https://github.com/miguelgrinberg/microdot/issues/253) ([commit](https://github.com/miguelgrinberg/microdot/commit/5e5fc5e93e11cbf6e3dc8036494e8732d1815d3e)) (thanks **Stanislav Garanzha**!)
**Release 2.0.6** - 2024-06-18
- Add event ID to the SSE implementation [#213](https://github.com/miguelgrinberg/microdot/issues/213) ([commit](https://github.com/miguelgrinberg/microdot/commit/904d5fcaa2d19d939a719b8e68c4dee3eb470739)) (thanks **Hamsanger**!)
- Configurable session cookie options [#242](https://github.com/miguelgrinberg/microdot/issues/242) ([commit](https://github.com/miguelgrinberg/microdot/commit/0151611fc84fec450820d673f4c4d70c32c990a7))
- Improved cookie support in the test client ([commit](https://github.com/miguelgrinberg/microdot/commit/4cb155ee411dc2d9c9f15714cb32b25ba79b156a))
- Cookie path support in session extension and test client ([commit](https://github.com/miguelgrinberg/microdot/commit/6ffb8a8fe920111c4d8c16e98715a0d5ee2d1da3))
- Refactor `Session` class to make it more reusable ([commit](https://github.com/miguelgrinberg/microdot/commit/dea79c5ce224dec7858ffef45a42bed442fd3a5a))
- Use `@functools.wraps` on decorated functions ([commit](https://github.com/miguelgrinberg/microdot/commit/f6876c0d154adcae96098405fb6a1fdf1ea4ec28))
- Removed outdated import from documentation [#216](https://github.com/miguelgrinberg/microdot/issues/216) ([commit](https://github.com/miguelgrinberg/microdot/commit/6b1fd6191702e7a9ad934fddfcdd0a3cebea7c94)) (thanks **Carlo Colombo**!)
- Add roadmap details to readme ([commit](https://github.com/miguelgrinberg/microdot/commit/a0ea439def238084c4d68309c0992b66ffd28ad6))
**Release 2.0.5** - 2024-03-09
- Correct handling of 0 as an integer argument (regression from #207) [#212](https://github.com/miguelgrinberg/microdot/issues/212) ([commit](https://github.com/miguelgrinberg/microdot/commit/d0a4cf8fa7dfb1da7466157b18d3329a8cf9a5df))
**Release 2.0.4** - 2024-02-20
- Do not use regexes for parsing simple URLs [#207](https://github.com/miguelgrinberg/microdot/issues/207) ([commit #1](https://github.com/miguelgrinberg/microdot/commit/38262c56d34784401659639b482a4a1224e1e59a) [commit #2](https://github.com/miguelgrinberg/microdot/commit/f6cba2c0f7e18e2f32b5adb779fb037b6c473eab))
- Added documentation on using alternative uTemplate loaders ([commit](https://github.com/miguelgrinberg/microdot/commit/bf519478cbc6e296785241cd7d01edb23c317cd3))
- Added CircuitPython builds ([commit](https://github.com/miguelgrinberg/microdot/commit/e44c271bae88f4327d3eda16d8780ac264d1ebab))
**Release 2.0.3** - 2024-01-07
- Add a limit to WebSocket message size [#193](https://github.com/miguelgrinberg/microdot/issues/193) ([commit](https://github.com/miguelgrinberg/microdot/commit/5d188e8c0ddef6ce633ca702dbdd4a90f2799597))
- Pass keyword arguments to thread executor in the correct way [#195](https://github.com/miguelgrinberg/microdot/issues/195) ([commit](https://github.com/miguelgrinberg/microdot/commit/6712c47400d7c426c88032f65ab74466524eccab))
- Update uasyncio library used in tests to include new TLS support ([commit](https://github.com/miguelgrinberg/microdot/commit/c8c91e83457d24320f22c9a74e80b15e06b072ca))
- Documentation improvements ([commit](https://github.com/miguelgrinberg/microdot/commit/b80b6b64d02d21400ca8a5077f5ed1127cc202ae))
**Release 2.0.2** - 2023-12-28
- Support binary data in the SSE extension ([commit](https://github.com/miguelgrinberg/microdot/commit/1fc11193da0d298f5539e2ad218836910a13efb2))
- Upgrade micropython tests to use v1.22 + initial CircuitPython testing work ([commit](https://github.com/miguelgrinberg/microdot/commit/79452a46992351ccad2c0317c20bf50be0d76641))
- Improvements to migration guide ([commit](https://github.com/miguelgrinberg/microdot/commit/84842e39c360a8b3ddf36feac8af201fb19bbb0b))
- Remove spurious async in documentation example [#187](https://github.com/miguelgrinberg/microdot/issues/187) ([commit](https://github.com/miguelgrinberg/microdot/commit/ad368be993e2e3007579f1d3880e36d60c71da92)) (thanks **Tak Tran**!)
**Release 2.0.1** - 2023-12-23
- Addressed some inadvertent mistakes in the template extensions ([commit](https://github.com/miguelgrinberg/microdot/commit/bd18ceb4424e9dfb52b1e6d498edd260aa24fc53))
**Release 2.0.0** - 2023-12-22 **Release 2.0.0** - 2023-12-22
- Major redesign switching to asyncio as the base implementation (See the [Migration Guide](https://microdot.readthedocs.io/en/stable/migrating.html) in the docs for details) [#186](https://github.com/miguelgrinberg/microdot/issues/186) ([commit](https://github.com/miguelgrinberg/microdot/commit/20ea305fe793eb206b52af9eb5c5f3c1e9f57dbb)) - Major redesign [#186](https://github.com/miguelgrinberg/microdot/issues/186) ([commit](https://github.com/miguelgrinberg/microdot/commit/20ea305fe793eb206b52af9eb5c5f3c1e9f57dbb))
- Code reorganization as a `microdot` package
- Asyncio is now the core implementation
- New support for Server-Sent Events (SSE)
- Several extensions redesigned
- Support for "partitioned" cookies
- [Cross-compiling and freezing](https://microdot.readthedocs.io/en/stable/freezing.html) guidance
- A [Migration Guide](https://microdot.readthedocs.io/en/stable/migrating.html) to help transition to version 2 from older releases
**Release 1.3.4** - 2023-11-08 **Release 1.3.4** - 2023-11-08

View File

@@ -32,5 +32,25 @@ describes the backwards incompatible changes that were made.
## Resources ## Resources
- Documentation: [Latest](https://microdot.readthedocs.io/en/latest/) [Stable](https://microdot.readthedocs.io/en/stable/) [Legacy v1.x](https://microdot.readthedocs.io/en/v1/)
- [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md) - [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md)
- Documentation
- [Latest](https://microdot.readthedocs.io/en/latest/)
- [Stable (v2)](https://microdot.readthedocs.io/en/stable/)
- [Legacy (v1)](https://microdot.readthedocs.io/en/v1/) ([Code](https://github.com/miguelgrinberg/microdot/tree/v1))
## Roadmap
The following features are planned for future releases of Microdot, both for
MicroPython and CPython:
- Support for forms encoded in `multipart/form-data` format
- Authentication support, similar to [Flask-Login](https://github.com/maxcountryman/flask-login) for Flask
- OpenAPI integration, similar to [APIFairy](https://github.com/miguelgrinberg/apifairy) for Flask
In addition to the above, the following extensions are also under consideration,
but only for CPython:
- Database integration through [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy)
- Socket.IO support through [python-socketio](https://github.com/miguelgrinberg/python-socketio)
Do you have other ideas to propose? Let's [discuss them](https://github.com/miguelgrinberg/microdot/discussions/new?category=ideas)!

BIN
bin/circuitpython Executable file

Binary file not shown.

Binary file not shown.

View File

@@ -1,8 +1,8 @@
API Reference API Reference
============= =============
``microdot`` module Core API
------------------- --------
.. autoclass:: microdot.Microdot .. autoclass:: microdot.Microdot
:members: :members:
@@ -14,51 +14,57 @@ API Reference
:members: :members:
``websocket`` extension WebSocket
----------------------- ---------
.. automodule:: microdot.websocket .. automodule:: microdot.websocket
:members: :members:
``utemplate`` templating extension Server-Sent Events (SSE)
---------------------------------- ------------------------
.. automodule:: microdot.sse
:members:
Templates (uTemplate)
---------------------
.. automodule:: microdot.utemplate .. automodule:: microdot.utemplate
:members: :members:
``jinja`` templating extension Templates (Jinja)
------------------------------ -----------------
.. automodule:: microdot.jinja .. automodule:: microdot.jinja
:members: :members:
``session`` extension User Sessions
--------------------- -------------
.. automodule:: microdot.session .. automodule:: microdot.session
:members: :members:
``cors`` extension Cross-Origin Resource Sharing (CORS)
------------------ ------------------------------------
.. automodule:: microdot.cors .. automodule:: microdot.cors
:members: :members:
``test_client`` extension Test Client
------------------------- -----------
.. automodule:: microdot.test_client .. automodule:: microdot.test_client
:members: :members:
``asgi`` extension ASGI
------------------ ----
.. autoclass:: microdot.asgi.Microdot .. autoclass:: microdot.asgi.Microdot
:members: :members:
:exclude-members: shutdown, run :exclude-members: shutdown, run
``wsgi`` extension WSGI
------------------- ----
.. autoclass:: microdot.wsgi.Microdot .. autoclass:: microdot.wsgi.Microdot
:members: :members:

View File

@@ -129,11 +129,25 @@ are the asynchronous versions of these two methods.
The default location from where templates are loaded is the *templates* The default location from where templates are loaded is the *templates*
subdirectory. This location can be changed with the subdirectory. This location can be changed with the
:func:`init_templates <microdot.utemplate.init_templates>` function:: :func:`Template.initialize <microdot.utemplate.Template.initialize>` class
method::
from microdot.utemplate import init_templates Template.initialize('my_templates')
init_templates('my_templates') By default templates are automatically compiled the first time they are
rendered, or when their last modified timestamp is more recent than the
compiledo file's timestamp. This loading behavior can be changed by switching
to a different template loader. For example, if the templates are pre-compiled,
the timestamp check and compile steps can be removed by switching to the
"compiled" template loader::
from utemplate import compiled
from microdot.utemplate import Template
Template.initialize(loader_class=compiled.Loader)
Consult the `uTemplate documentation <https://github.com/pfalcon/utemplate>`_
for additional information regarding template loaders.
Using the Jinja Engine Using the Jinja Engine
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^
@@ -174,17 +188,15 @@ method, which returns a generator instead of a string.
The default location from where templates are loaded is the *templates* The default location from where templates are loaded is the *templates*
subdirectory. This location can be changed with the subdirectory. This location can be changed with the
:func:`init_templates <microdot.utemplate.init_templates>` function:: :func:`Template.initialize <microdot.jinja.Template.initialize>` class method::
from microdot.jinja import init_templates Template.initialize('my_templates')
init_templates('my_templates') The ``initialize()`` method also accepts ``enable_async`` argument, which
The ``init_templates()`` function also accepts ``enable_async`` argument, which
can be set to ``True`` if asynchronous rendering of templates is desired. If can be set to ``True`` if asynchronous rendering of templates is desired. If
this option is enabled, then the this option is enabled, then the
:func:`render_async() <microdot.utemplate.Template.render_async>` and :func:`render_async() <microdot.jinja.Template.render_async>` and
:func:`generate_async() <microdot.utemplate.Template.generate_async>` methods :func:`generate_async() <microdot.jinja.Template.generate_async>` methods
must be used. must be used.
.. note:: .. note::
@@ -274,7 +286,7 @@ Cross-Origin Resource Sharing (CORS)
- | None - | None
* - Examples * - Examples
- | `cors.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/cors/cors.py>`_ - | `app.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/cors/app.py>`_
The CORS extension provides support for `Cross-Origin Resource Sharing The CORS extension provides support for `Cross-Origin Resource Sharing
(CORS) <https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS>`_. CORS is a (CORS) <https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS>`_. CORS is a
@@ -351,7 +363,7 @@ Using an ASGI Web Server
- | `asgi.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/asgi.py>`_ - | `asgi.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/asgi.py>`_
* - Required external dependencies * - Required external dependencies
- | An ASGI web server, such as `Uvicorn <https://uvicorn.org/>`_. - | An ASGI web server, such as `Uvicorn <https://www.uvicorn.org/>`_.
* - Examples * - Examples
- | `hello_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_asgi.py>`_ - | `hello_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/hello/hello_asgi.py>`_

View File

@@ -25,14 +25,15 @@ and incorporated into a custom MicroPython firmware.
Use the following guidelines to know what files to copy: Use the following guidelines to know what files to copy:
- For a minimal setup with only the base web server functionality, copy * For a minimal setup with only the base web server functionality, copy
`microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_ `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
into your project. into your project.
- For a configuration that includes one or more optional extensions, create a * For a configuration that includes one or more optional extensions, create a
*microdot* directory in your device and copy the following files: *microdot* directory in your device and copy the following files:
- `__init__.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/__init__.py>`_
- `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_ * `__init__.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/__init__.py>`_
- any needed `extensions <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot>`_. * `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
* any needed `extensions <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot>`_.
Getting Started Getting Started
@@ -81,8 +82,34 @@ handler functions can be defined as ``async def`` or ``def`` functions, but
``async def`` functions are recommended for performance. ``async def`` functions are recommended for performance.
The :func:`run() <microdot.Microdot.run>` method starts the application's web The :func:`run() <microdot.Microdot.run>` method starts the application's web
server on port 5000 by default. This method blocks while it waits for server on port 5000 by default, and creates its own asynchronous loop. This
connections from clients. method blocks while it waits for connections from clients.
For some applications it may be necessary to run the web server alongside other
asynchronous tasks, on an already running loop. In that case, instead of
``app.run()`` the web server can be started by invoking the
:func:`start_server() <microdot.Microdot.start_server>` coroutine as shown in
the following example::
import asyncio
from microdot import Microdot
app = Microdot()
@app.route('/')
async def index(request):
return 'Hello, world!'
async def main():
# start the server in a background task
server = asyncio.create_task(app.start_server())
# ... do other asynchronous work here ...
# cleanup before ending the application
await server
asyncio.run(main())
Running with CPython Running with CPython
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
@@ -91,7 +118,7 @@ Running with CPython
:align: left :align: left
* - Required Microdot source files * - Required Microdot source files
- | `microdot.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot.py>`_ - | `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
* - Required external dependencies * - Required external dependencies
- | None - | None
@@ -117,7 +144,7 @@ Running with MicroPython
:align: left :align: left
* - Required Microdot source files * - Required Microdot source files
- | `microdot.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot.py>`_ - | `microdot.py <https://github.com/miguelgrinberg/microdot/blob/main/src/microdot/microdot.py>`_
* - Required external dependencies * - Required external dependencies
- | None - | None
@@ -144,8 +171,9 @@ changed by passing the ``port`` argument to the ``run()`` method.
Web Server Configuration Web Server Configuration
^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^
The :func:`run() <microdot.Microdot.run>` method supports a few arguments to The :func:`run() <microdot.Microdot.run>` and
configure the web server. :func:`start_server() <microdot.Microdot.start_server>` methods support a few
arguments to configure the web server.
- ``port``: The port number to listen on. Pass the desired port number in this - ``port``: The port number to listen on. Pass the desired port number in this
argument to use a port different than the default of 5000. For example:: argument to use a port different than the default of 5000. For example::
@@ -171,10 +199,8 @@ configure the web server.
app.run(port=4443, debug=True, ssl=sslctx) app.run(port=4443, debug=True, ssl=sslctx)
.. note:: .. note::
The ``ssl`` argument can only be used with CPython at this time, because When using CPython, the certificate and key files must be given in PEM
MicroPython's asyncio module does not currently support SSL certificates or format. When using MicroPython, these files must be given in DER format.
TLS encryption. Work on this is
`in progress <https://github.com/micropython/micropython/pull/11897>`_.
Defining Routes Defining Routes
~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~
@@ -297,7 +323,7 @@ match and the route will not be called.
A special type ``path`` can be used to capture the remainder of the path as a A special type ``path`` can be used to capture the remainder of the path as a
single argument. The difference between an argument of type ``path`` and one of single argument. The difference between an argument of type ``path`` and one of
type ``string`` is that the latter stops capturing when a ``/`` appears in the type ``string`` is that the latter stops capturing when a ``/`` appears in the
URL. URL::
@app.get('/tests/<path:path>') @app.get('/tests/<path:path>')
async def get_test(request, path): async def get_test(request, path):
@@ -462,7 +488,7 @@ the sub-applications to build the larger combined application::
from customers import customers_app from customers import customers_app
from orders import orders_app from orders import orders_app
async def create_app(): def create_app():
app = Microdot() app = Microdot()
app.mount(customers_app, url_prefix='/customers') app.mount(customers_app, url_prefix='/customers')
app.mount(orders_app, url_prefix='/orders') app.mount(orders_app, url_prefix='/orders')

View File

@@ -39,7 +39,7 @@ extension.
Any applications built using the asyncio extension will need to update their Any applications built using the asyncio extension will need to update their
imports from this:: imports from this::
from microdot.asyncio import Microdot from microdot_asyncio import Microdot
to this:: to this::
@@ -94,7 +94,7 @@ as a single string::
Streamed templates also have an asynchronous version:: Streamed templates also have an asynchronous version::
return await Template('index.html').generate_async(title='Home') return Template('index.html').generate_async(title='Home')
Class-based user sessions Class-based user sessions
~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -138,5 +138,8 @@ deployed with standard WSGI servers such as Gunicorn.
WebSocket support when using the WSGI extension is enabled when using a WebSocket support when using the WSGI extension is enabled when using a
compatible web server. At this time only Gunicorn is supported for WebSocket. compatible web server. At this time only Gunicorn is supported for WebSocket.
Given that WebSocket support is asynchronous, it would be better to switch to
the ASGI extension, which has full support for WebSocket as defined in the ASGI
specification.
As before, the WSGI extension is not available under MicroPython. As before, the WSGI extension is not available under MicroPython.

View File

@@ -9,16 +9,14 @@ aiofiles==23.2.1
annotated-types==0.6.0 annotated-types==0.6.0
# via pydantic # via pydantic
anyio==3.7.1 anyio==3.7.1
# via # via starlette
# fastapi
# starlette
blinker==1.7.0 blinker==1.7.0
# via # via
# flask # flask
# quart # quart
build==1.0.3 build==1.0.3
# via pip-tools # via pip-tools
certifi==2023.11.17 certifi==2024.7.4
# via requests # via requests
charset-normalizer==3.3.2 charset-normalizer==3.3.2
# via requests # via requests
@@ -28,13 +26,13 @@ click==8.1.7
# pip-tools # pip-tools
# quart # quart
# uvicorn # uvicorn
fastapi==0.104.1 fastapi==0.109.1
# via -r requirements.in # via -r requirements.in
flask==3.0.0 flask==3.0.0
# via # via
# -r requirements.in # -r requirements.in
# quart # quart
gunicorn==21.2.0 gunicorn==22.0.0
# via -r requirements.in # via -r requirements.in
h11==0.14.0 h11==0.14.0
# via # via
@@ -51,7 +49,7 @@ hypercorn==0.15.0
# via quart # via quart
hyperframe==6.0.1 hyperframe==6.0.1
# via h2 # via h2
idna==3.6 idna==3.7
# via # via
# anyio # anyio
# requests # requests
@@ -59,7 +57,7 @@ itsdangerous==2.1.2
# via # via
# flask # flask
# quart # quart
jinja2==3.1.2 jinja2==3.1.4
# via # via
# flask # flask
# quart # quart
@@ -84,24 +82,24 @@ pydantic-core==2.14.5
# via pydantic # via pydantic
pyproject-hooks==1.0.0 pyproject-hooks==1.0.0
# via build # via build
quart==0.19.4 quart==0.19.7
# via -r requirements.in # via -r requirements.in
requests==2.31.0 requests==2.32.0
# via -r requirements.in # via -r requirements.in
sniffio==1.3.0 sniffio==1.3.0
# via anyio # via anyio
starlette==0.27.0 starlette==0.35.1
# via fastapi # via fastapi
typing-extensions==4.9.0 typing-extensions==4.9.0
# via # via
# fastapi # fastapi
# pydantic # pydantic
# pydantic-core # pydantic-core
urllib3==2.1.0 urllib3==2.2.2
# via requests # via requests
uvicorn==0.24.0.post1 uvicorn==0.24.0.post1
# via -r requirements.in # via -r requirements.in
werkzeug==3.0.1 werkzeug==3.0.6
# via # via
# flask # flask
# quart # quart

View File

@@ -1,7 +1,7 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.jinja import template, init_templates from microdot.jinja import Template
init_templates('templates', enable_async=True) Template.initialize('templates', enable_async=True)
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -11,7 +11,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return await template('index.html').render_async(name=name) return await Template('index.html').render_async(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.jinja import template from microdot.jinja import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -7,12 +7,12 @@ Response.default_content_type = 'text/html'
@app.route('/') @app.route('/')
async def index(req): async def index(req):
return template('page1.html').render(page='Page 1') return Template('page1.html').render(page='Page 1')
@app.route('/page2') @app.route('/page2')
async def page2(req): async def page2(req):
return template('page2.html').render(page='Page 2') return Template('page2.html').render(page='Page 2')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.jinja import template from microdot.jinja import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.asgi import Microdot, Response from microdot.asgi import Microdot, Response
from microdot.jinja import template from microdot.jinja import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.wsgi import Microdot, Response from microdot.wsgi import Microdot, Response
from microdot.jinja import template from microdot.jinja import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.jinja import template from microdot.jinja import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').generate(name=name) return Template('index.html').generate(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return await template('index.html').render_async(name=name) return await Template('index.html').render_async(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -7,12 +7,12 @@ Response.default_content_type = 'text/html'
@app.route('/') @app.route('/')
async def index(req): async def index(req):
return template('page1.html').render(page='Page 1') return Template('page1.html').render(page='Page 1')
@app.route('/page2') @app.route('/page2')
async def page2(req): async def page2(req):
return template('page2.html').render(page='Page 2') return Template('page2.html').render(page='Page 2')
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.asgi import Microdot, Response from microdot.asgi import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot.wsgi import Microdot, Response from microdot.wsgi import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').render(name=name) return Template('index.html').render(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,5 +1,5 @@
from microdot import Microdot, Response from microdot import Microdot, Response
from microdot.utemplate import template from microdot.utemplate import Template
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@@ -10,7 +10,7 @@ async def index(req):
name = None name = None
if req.method == 'POST': if req.method == 'POST':
name = req.form.get('name') name = req.form.get('name')
return template('index.html').generate(name=name) return Template('index.html').generate(name=name)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -1,4 +1,5 @@
import ssl import ssl
import sys
from microdot import Microdot from microdot import Microdot
app = Microdot() app = Microdot()
@@ -31,6 +32,7 @@ async def shutdown(request):
return 'The server is shutting down...' return 'The server is shutting down...'
ext = 'der' if sys.implementation.name == 'micropython' else 'pem'
sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain('cert.pem', 'key.pem') sslctx.load_cert_chain('cert.' + ext, 'key.' + ext)
app.run(port=4443, debug=True, ssl=sslctx) app.run(port=4443, debug=True, ssl=sslctx)

View File

@@ -0,0 +1,139 @@
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_ticks`
================================================================================
Work with intervals and deadlines in milliseconds
* Author(s): Jeff Epler
Implementation Notes
--------------------
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
# imports
from micropython import const
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ticks.git"
_TICKS_PERIOD = const(1 << 29)
_TICKS_MAX = const(_TICKS_PERIOD - 1)
_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2)
# Get the correct implementation of ticks_ms. There are three possibilities:
#
# - supervisor.ticks_ms is present. This will be the case starting in CP7.0
#
# - time.ticks_ms is present. This is the case for MicroPython & for the "unix
# port" of CircuitPython, used for some automated testing.
#
# - time.monotonic_ns is present, and works. This is the case on most
# Express boards in CP6.x, and most host computer versions of Python.
#
# - Otherwise, time.monotonic is assumed to be present. This is the case
# on most non-express boards in CP6.x, and some old host computer versions
# of Python.
#
# Note that on microcontrollers, this time source becomes increasingly
# inaccurate when the board has not been reset in a long time, losing the
# ability to measure 1ms intervals after about 1 hour, and losing the
# ability to meausre 128ms intervals after 6 days. The only solution is to
# either upgrade to a version with supervisor.ticks_ms, or to switch to a
# board with time.monotonic_ns.
try:
from supervisor import ticks_ms # pylint: disable=unused-import
except (ImportError, NameError):
import time
if _ticks_ms := getattr(time, "ticks_ms", None):
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return _ticks_ms() & _TICKS_MAX # pylint: disable=not-callable
else:
try:
from time import monotonic_ns as _monotonic_ns
_monotonic_ns() # Check that monotonic_ns is usable
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return (_monotonic_ns() // 1_000_000) & _TICKS_MAX
except (ImportError, NameError, NotImplementedError):
from time import monotonic as _monotonic
def ticks_ms() -> int:
"""Return the time in milliseconds since an unspecified moment,
wrapping after 2**29ms.
The wrap value was chosen so that it is always possible to add or
subtract two `ticks_ms` values without overflow on a board without
long ints (or without allocating any long integer objects, on
boards with long ints).
This ticks value comes from a low-accuracy clock internal to the
microcontroller, just like `time.monotonic`. Due to its low
accuracy and the fact that it "wraps around" every few days, it is
intended for working with short term events like advancing an LED
animation, not for long term events like counting down the time
until a holiday."""
return int(_monotonic() * 1000) & _TICKS_MAX
def ticks_add(ticks: int, delta: int) -> int:
"Add a delta to a base number of ticks, performing wraparound at 2**29ms."
return (ticks + delta) % _TICKS_PERIOD
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values,
assuming that they are within 2**28 ticks"""
diff = (ticks1 - ticks2) & _TICKS_MAX
diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD
return diff
def ticks_less(ticks1: int, ticks2: int) -> bool:
"""Return true if ticks1 is before ticks2 and false otherwise,
assuming that they are within 2**28 ticks"""
return ticks_diff(ticks1, ticks2) < 0

View File

@@ -0,0 +1,41 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
from .core import *
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/Adafruit/Adafruit_CircuitPython_asyncio.git"
_attrs = {
"wait_for": "funcs",
"wait_for_ms": "funcs",
"gather": "funcs",
"Event": "event",
"ThreadSafeFlag": "event",
"Lock": "lock",
"open_connection": "stream",
"start_server": "stream",
"StreamReader": "stream",
"StreamWriter": "stream",
}
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

View File

@@ -0,0 +1,430 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Core
====
"""
from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
try:
from traceback import print_exception
except:
from .traceback import print_exception
# Import TaskQueue and Task, preferring built-in C code over Python code
try:
from _asyncio import TaskQueue, Task
except ImportError:
from .task import TaskQueue, Task
################################################################################
# Exceptions
# Depending on the release of CircuitPython these errors may or may not
# exist in the C implementation of `_asyncio`. However, when they
# do exist, they must be preferred over the Python code.
try:
from _asyncio import CancelledError, InvalidStateError
except (ImportError, AttributeError):
class CancelledError(BaseException):
"""Injected into a task when calling `Task.cancel()`"""
pass
class InvalidStateError(Exception):
"""Can be raised in situations like setting a result value for a task object that already has a result value set."""
pass
class TimeoutError(Exception):
"""Raised when waiting for a task longer than the specified timeout."""
pass
# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
################################################################################
# Sleep functions
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
"""Sleep for *t* milliseconds.
This is a coroutine, and a MicroPython extension.
"""
assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
# Pause task execution for the given time (in seconds)
def sleep(t):
"""Sleep for *t* seconds
This is a coroutine.
"""
return sleep_ms(int(t * 1000))
################################################################################
# "Never schedule" object"
# Don't re-schedule the object that awaits _never().
# For internal use only. Some constructs, like `await event.wait()`,
# work by NOT re-scheduling the task which calls wait(), but by
# having some other task schedule it later.
class _NeverSingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __await__(self):
return self
def __next__(self):
if self.state is not None:
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
def _never(sgen=_NeverSingletonGenerator()):
# assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = False
return sgen
################################################################################
# Queue and poller for stream IO
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
def _enqueue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
# Link task to this IOQueue so it can be removed if needed
cur_task.data = self
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
async def queue_read(self, s):
self._enqueue(s, 0)
await _never()
async def queue_write(self, s):
self._enqueue(s, 1)
await _never()
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
################################################################################
# Main run loop
# Ensure the awaitable is a task
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
# Create and schedule a new task from a coroutine
def create_task(coro):
"""Create a new task from the given coroutine and schedule it to run.
Returns the corresponding `Task` object.
"""
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
return t
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
"""Run the given *main_task* until it completes."""
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _task_queue is ready to run
dt = 1
while dt > 0:
dt = -1
t = _task_queue.peek()
if t:
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
exc = t.data
if not exc:
t.coro.send(None)
else:
# If the task is finished and on the run queue and gets here, then it
# had an exception and was not await'ed on. Throwing into it now will
# raise StopIteration and the code below will catch this and run the
# call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
if t.state:
# Task was running but is now finished.
waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push_head(t.state.pop_head())
waiting = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Save return value of coro to pass up to caller.
t.data = er
elif t.state is None:
# Task is already finished and nothing await'ed on the task,
# so call the exception handler.
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
def run(coro):
"""Create a new task from the given coroutine and run it until it completes.
Returns the value returned by *coro*.
"""
return run_until_complete(create_task(coro))
################################################################################
# Event loop wrapper
async def _stopper():
pass
_stop_task = None
class Loop:
"""Class representing the event loop"""
_exc_handler = None
def create_task(coro):
"""Create a task from the given *coro* and return the new `Task` object."""
return create_task(coro)
def run_forever():
"""Run the event loop until `Loop.stop()` is called."""
global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(aw):
"""Run the given *awaitable* until it completes. If *awaitable* is not a task then
it will be promoted to one.
"""
return run_until_complete(_promote_to_task(aw))
def stop():
"""Stop the event loop"""
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None
def close():
"""Close the event loop."""
pass
def set_exception_handler(handler):
"""Set the exception handler to call when a Task raises an exception that is not
caught. The *handler* should accept two arguments: ``(loop, context)``
"""
Loop._exc_handler = handler
def get_exception_handler():
"""Get the current exception handler. Returns the handler, or ``None`` if no
custom handler is set.
"""
return Loop._exc_handler
def default_exception_handler(loop, context):
"""The default exception handler that is called."""
exc = context["exception"]
print_exception(None, exc, exc.__traceback__)
def call_exception_handler(context):
"""Call the current exception handler. The argument *context* is passed through
and is a dictionary containing keys:
``'message'``, ``'exception'``, ``'future'``
"""
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
"""Return the event loop used to schedule and run tasks. See `Loop`."""
return Loop
def current_task():
"""Return the `Task` object associated with the currently running task."""
return cur_task
def new_event_loop():
"""Reset the event loop and return it.
**NOTE**: Since MicroPython only has a single event loop, this function just resets
the loop's state, it does not create a new one
"""
global _task_queue, _io_queue, _exc_context, cur_task
# TaskQueue of Task instances
_task_queue = TaskQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
cur_task = None
_exc_context['exception'] = None
_exc_context['future'] = None
return Loop
# Initialise default event loop
new_event_loop()

View File

@@ -0,0 +1,92 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Events
======
"""
from . import core
# Event class for primitive events that can be waited on, set, and cleared
class Event:
"""Create a new event which can be used to synchronize tasks. Events
start in the cleared state.
"""
def __init__(self):
self.state = False # False=unset; True=set
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""
return self.state
def set(self):
"""Set the event. Any tasks waiting on the event will be scheduled to run.
"""
# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
self.state = True
def clear(self):
"""Clear the event."""
self.state = False
async def wait(self):
"""Wait for the event to be set. If the event is already set then it returns
immediately.
This is a coroutine.
"""
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
await core._never()
return True
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
if not self._flag:
yield core._io_queue.queue_read(self)
self._flag = 0
except ImportError:
pass

View File

@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2022 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Functions
=========
"""
from . import core
async def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)
async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
from it.
If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``:
this should be trapped by the caller.
Returns the return value of *aw*.
This is a coroutine.
"""
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
# Run aw in a separate runner task that manages its exceptions.
runner_task = core.create_task(_run(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
status = er.args[0] if er.args else None
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
runner_task.cancel()
await runner_task
raise core.TimeoutError
def wait_for_ms(aw, timeout):
"""Similar to `wait_for` but *timeout* is an integer in milliseconds.
This is a coroutine, and a MicroPython extension.
"""
return wait_for(aw, timeout, core.sleep_ms)
class _Remove:
@staticmethod
def remove(t):
pass
async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
Returns a list of return values of all *aws*
"""
if not aws:
return []
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done
# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False
# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
await core._never()
except core.CancelledError as er:
cancel_all = True
state = er
# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state
# Return the list of return values of each sub-task.
return ts

View File

@@ -0,0 +1,87 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Locks
=====
"""
from . import core
# Lock class for primitive mutex capability
class Lock:
"""Create a new lock which can be used to coordinate tasks. Locks start in
the unlocked state.
In addition to the methods below, locks can be used in an ``async with``
statement.
"""
def __init__(self):
# The state can take the following values:
# - 0: unlocked
# - 1: locked
# - <Task>: unlocked but this task has been scheduled to acquire the lock next
self.state = 0
# Queue of Tasks waiting to acquire this Lock
self.waiting = core.TaskQueue()
def locked(self):
"""Returns ``True`` if the lock is locked, otherwise ``False``."""
return self.state == 1
def release(self):
"""Release the lock. If any tasks are waiting on the lock then the next
one in the queue is scheduled to run and the lock remains locked. Otherwise,
no tasks are waiting and the lock becomes unlocked.
"""
if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
"""Wait for the lock to be in the unlocked state and then lock it in an
atomic way. Only one task can acquire the lock at any one time.
This is a coroutine.
"""
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
await core._never()
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 1
self.release()
raise er
# Lock available, set it as locked
self.state = 1
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()

View File

@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2019 Damien P. George
#
# SPDX-License-Identifier: MIT
#
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
),
opt=3,
)

View File

@@ -0,0 +1,263 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Streams
=======
"""
from . import core
class Stream:
"""This represents a TCP stream connection. To minimise code this class
implements both a reader and a writer, and both ``StreamReader`` and
``StreamWriter`` alias to this class.
"""
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b""
def get_extra_info(self, v):
"""Get extra information about the stream, given by *v*. The valid
values for *v* are: ``peername``.
"""
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
"""Wait for the stream to close.
This is a coroutine.
"""
# TODO yield?
self.s.close()
async def read(self, n):
"""Read up to *n* bytes and return them.
This is a coroutine.
"""
await core._io_queue.queue_read(self.s)
return self.s.read(n)
async def readinto(self, buf):
"""Read up to n bytes into *buf* with n being equal to the length of *buf*
Return the number of bytes read into *buf*
This is a coroutine, and a MicroPython extension.
"""
await core._io_queue.queue_read(self.s)
return self.s.readinto(buf)
async def readexactly(self, n):
"""Read exactly *n* bytes and return them as a bytes object.
Raises an ``EOFError`` exception if the stream ends before reading
*n* bytes.
This is a coroutine.
"""
r = b""
while n:
await core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if not len(r2):
raise EOFError
r += r2
n -= len(r2)
return r
async def readline(self):
"""Read a line and return it.
This is a coroutine.
"""
l = b""
while True:
await core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
"""Accumulated *buf* to the output buffer. The data is only flushed when
`Stream.drain` is called. It is recommended to call `Stream.drain`
immediately after calling this function.
"""
if not self.out_buf:
# Try to write immediately to the underlying stream.
ret = self.s.write(buf)
if ret == len(buf):
return
if ret is not None:
buf = buf[ret:]
self.out_buf += buf
async def drain(self):
"""Drain (write) all buffered output data out to the stream.
This is a coroutine.
"""
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
await core._io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b""
# Stream can be used for both reading and writing to save code size
StreamReader = Stream
StreamWriter = Stream
# Create a TCP stream connection to a remote host
async def open_connection(host, port):
"""Open a TCP connection to the given *host* and *port*. The *host* address will
be resolved using `socket.getaddrinfo`, which is currently a blocking call.
Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific
``OSError`` if the host could not be resolved or if the connection could not be made.
This is a coroutine.
"""
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
await core._io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
"""This represents the server class returned from `start_server`. It can be used in
an ``async with`` statement to close the server upon exit.
"""
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
"""Close the server."""
self.task.cancel()
async def wait_closed(self):
"""Wait for the server to close.
This is a coroutine.
"""
await self.task
async def _serve(self, s, cb):
# Accept incoming connections
while True:
try:
await core._io_queue.queue_read(s)
except core.CancelledError:
# Shutdown server
s.close()
return
try:
s2, addr = s.accept()
except:
# Ignore a failed accept
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
"""Start a TCP server on the given *host* and *port*. The *cb* callback will be
called with incoming, accepted connections, and be passed 2 arguments: reader
writer streams for the connection.
Returns a `Server` object.
This is a coroutine.
"""
import usocket as socket
# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.listen(backlog)
# Create and return server object and task.
srv = Server()
srv.task = core.create_task(srv._serve(s, cb))
return srv
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off : off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?

View File

@@ -0,0 +1,215 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Tasks
=====
"""
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
from . import core
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)
def pop(self):
v = self.heap
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Compatibility aliases, remove after they are no longer used
push_head = push
push_sorted = push
pop_head = pop
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
"""This object wraps a coroutine into a running task. Tasks can be waited on
using ``await task``, which will wait for the task to complete and return the
return value of the task.
Tasks should not be created directly, rather use ``create_task`` to create them.
"""
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if not self.state:
# Task finished, signal that is has been await'ed on.
self.state = False
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self
# CircuitPython needs __await()__.
__await__ = __iter__
def __next__(self):
if not self.state:
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
"""Whether the task is complete."""
return not self.state
def cancel(self):
"""Cancel the task by injecting a ``CancelledError`` into it. The task
may or may not ignore this exception.
"""
# Check if task is already finished.
if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True

View File

@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2019-2020 Damien P. George
#
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
"""
Fallback traceback module if the system traceback is missing.
"""
try:
from typing import List
except ImportError:
pass
import sys
def _print_traceback(traceback, limit=None, file=sys.stderr) -> List[str]:
if limit is None:
if hasattr(sys, "tracebacklimit"):
limit = sys.tracebacklimit
n = 0
while traceback is not None:
frame = traceback.tb_frame
line_number = traceback.tb_lineno
frame_code = frame.f_code
filename = frame_code.co_filename
name = frame_code.co_name
print(' File "%s", line %d, in %s' % (filename, line_number, name), file=file)
traceback = traceback.tb_next
n = n + 1
if limit is not None and n >= limit:
break
def print_exception(exception, value=None, traceback=None, limit=None, file=sys.stderr):
"""
Print exception information and stack trace to file.
"""
if traceback:
print("Traceback (most recent call last):", file=file)
_print_traceback(traceback, limit=limit, file=file)
if isinstance(exception, BaseException):
exception_type = type(exception).__name__
elif hasattr(exception, "__name__"):
exception_type = exception.__name__
else:
exception_type = type(value).__name__
valuestr = str(value)
if value is None or not valuestr:
print(exception_type, file=file)
else:
print("%s: %s" % (str(exception_type), valuestr), file=file)

View File

@@ -13,12 +13,6 @@ class Stream:
def get_extra_info(self, v): def get_extra_info(self, v):
return self.e[v] return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self): def close(self):
pass pass
@@ -63,6 +57,8 @@ class Stream:
while True: while True:
yield core._io_queue.queue_read(self.s) yield core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block l2 = self.s.readline() # may do multiple reads but won't block
if l2 is None:
continue
l += l2 l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str) if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l return l
@@ -100,19 +96,29 @@ StreamWriter = Stream
# Create a TCP stream connection to a remote host # Create a TCP stream connection to a remote host
# #
# async # async
def open_connection(host, port): def open_connection(host, port, ssl=None, server_hostname=None):
from errno import EINPROGRESS from errno import EINPROGRESS
import socket import socket
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2]) s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False) s.setblocking(False)
ss = Stream(s)
try: try:
s.connect(ai[-1]) s.connect(ai[-1])
except OSError as er: except OSError as er:
if er.errno != EINPROGRESS: if er.errno != EINPROGRESS:
raise er raise er
# wrap with SSL, if requested
if ssl:
if ssl is True:
import ssl as _ssl
ssl = _ssl.SSLContext(_ssl.PROTOCOL_TLS_CLIENT)
if not server_hostname:
server_hostname = host
s = ssl.wrap_socket(s, server_hostname=server_hostname, do_handshake_on_connect=False)
s.setblocking(False)
ss = Stream(s)
yield core._io_queue.queue_write(s) yield core._io_queue.queue_write(s)
return ss, ss return ss, ss
@@ -135,7 +141,7 @@ class Server:
async def wait_closed(self): async def wait_closed(self):
await self.task await self.task
async def _serve(self, s, cb): async def _serve(self, s, cb, ssl):
self.state = False self.state = False
# Accept incoming connections # Accept incoming connections
while True: while True:
@@ -156,6 +162,13 @@ class Server:
except: except:
# Ignore a failed accept # Ignore a failed accept
continue continue
if ssl:
try:
s2 = ssl.wrap_socket(s2, server_side=True, do_handshake_on_connect=False)
except OSError as e:
core.sys.print_exception(e)
s2.close()
continue
s2.setblocking(False) s2.setblocking(False)
s2s = Stream(s2, {"peername": addr}) s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s)) core.create_task(cb(s2s, s2s))
@@ -163,7 +176,7 @@ class Server:
# Helper function to start a TCP stream server, running as a new task # Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task # TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5): async def start_server(cb, host, port, backlog=5, ssl=None):
import socket import socket
# Create and bind server socket. # Create and bind server socket.
@@ -176,7 +189,7 @@ async def start_server(cb, host, port, backlog=5):
# Create and return server object and task. # Create and return server object and task.
srv = Server() srv = Server()
srv.task = core.create_task(srv._serve(s, cb)) srv.task = core.create_task(srv._serve(s, cb, ssl))
try: try:
# Ensure that the _serve task has been scheduled so that it gets to # Ensure that the _serve task has been scheduled so that it gets to
# handle cancellation. # handle cancellation.

View File

@@ -1,79 +0,0 @@
from utime import *
from micropython import const
_TS_YEAR = const(0)
_TS_MON = const(1)
_TS_MDAY = const(2)
_TS_HOUR = const(3)
_TS_MIN = const(4)
_TS_SEC = const(5)
_TS_WDAY = const(6)
_TS_YDAY = const(7)
_TS_ISDST = const(8)
_WDAY = const(("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"))
_MDAY = const(
(
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
)
)
def strftime(datefmt, ts):
from io import StringIO
fmtsp = False
ftime = StringIO()
for k in datefmt:
if fmtsp:
if k == "a":
ftime.write(_WDAY[ts[_TS_WDAY]][0:3])
elif k == "A":
ftime.write(_WDAY[ts[_TS_WDAY]])
elif k == "b":
ftime.write(_MDAY[ts[_TS_MON] - 1][0:3])
elif k == "B":
ftime.write(_MDAY[ts[_TS_MON] - 1])
elif k == "d":
ftime.write("%02d" % ts[_TS_MDAY])
elif k == "H":
ftime.write("%02d" % ts[_TS_HOUR])
elif k == "I":
ftime.write("%02d" % (ts[_TS_HOUR] % 12))
elif k == "j":
ftime.write("%03d" % ts[_TS_YDAY])
elif k == "m":
ftime.write("%02d" % ts[_TS_MON])
elif k == "M":
ftime.write("%02d" % ts[_TS_MIN])
elif k == "P":
ftime.write("AM" if ts[_TS_HOUR] < 12 else "PM")
elif k == "S":
ftime.write("%02d" % ts[_TS_SEC])
elif k == "w":
ftime.write(str(ts[_TS_WDAY]))
elif k == "y":
ftime.write("%02d" % (ts[_TS_YEAR] % 100))
elif k == "Y":
ftime.write(str(ts[_TS_YEAR]))
else:
ftime.write(k)
fmtsp = False
elif k == "%":
fmtsp = True
else:
ftime.write(k)
val = ftime.getvalue()
ftime.close()
return val

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "microdot" name = "microdot"
version = "2.0.0" version = "2.0.7"
authors = [ authors = [
{ name = "Miguel Grinberg", email = "miguel.grinberg@gmail.com" }, { name = "Miguel Grinberg", email = "miguel.grinberg@gmail.com" },
] ]
@@ -14,6 +14,8 @@ classifiers = [
"Operating System :: OS Independent", "Operating System :: OS Independent",
] ]
requires-python = ">=3.8" requires-python = ">=3.8"
dependencies = [
]
[project.readme] [project.readme]
file = "README.md" file = "README.md"
@@ -24,8 +26,12 @@ Homepage = "https://github.com/miguelgrinberg/microdot"
"Bug Tracker" = "https://github.com/miguelgrinberg/microdot/issues" "Bug Tracker" = "https://github.com/miguelgrinberg/microdot/issues"
[project.optional-dependencies] [project.optional-dependencies]
dev = [
"tox",
]
docs = [ docs = [
"sphinx", "sphinx",
"pyjwt",
] ]
[tool.setuptools] [tool.setuptools]

View File

@@ -2,7 +2,11 @@ import sys
sys.path.insert(0, 'src') sys.path.insert(0, 'src')
sys.path.insert(2, 'libs/common') sys.path.insert(2, 'libs/common')
sys.path.insert(3, 'libs/micropython') if sys.implementation.name == 'circuitpython':
sys.path.insert(3, 'libs/circuitpython')
sys.path.insert(4, 'libs/micropython')
else:
sys.path.insert(3, 'libs/micropython')
import unittest import unittest

View File

@@ -45,6 +45,12 @@ class _BodyStream: # pragma: no cover
class Microdot(BaseMicrodot): class Microdot(BaseMicrodot):
"""A subclass of the core :class:`Microdot <microdot.Microdot>` class that
implements the ASGI protocol.
This class must be used as the application instance when running under an
ASGI web server.
"""
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.embedded_server = False self.embedded_server = False

8
src/microdot/helpers.py Normal file
View File

@@ -0,0 +1,8 @@
try:
from functools import wraps
except ImportError: # pragma: no cover
# MicroPython does not currently implement functools.wraps
def wraps(wrapped):
def _(wrapper):
return wrapper
return _

View File

@@ -3,36 +3,37 @@ from jinja2 import Environment, FileSystemLoader, select_autoescape
_jinja_env = None _jinja_env = None
def init_templates(template_dir='templates', enable_async=False, **kwargs):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load templates
from a *templates* subdirectory.
:param enable_async: set to ``True`` to enable the async rendering engine
in Jinja, and the ``render_async()`` and
``generate_async()`` methods.
:param kwargs: any additional options to be passed to Jinja's
``Environment`` class.
"""
global _jinja_env
_jinja_env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(),
enable_async=enable_async,
**kwargs
)
class Template: class Template:
"""A template object. """A template object.
:param template: The filename of the template to render, relative to the :param template: The filename of the template to render, relative to the
configured template directory. configured template directory.
""" """
@classmethod
def initialize(cls, template_dir='templates', enable_async=False,
**kwargs):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param enable_async: set to ``True`` to enable the async rendering
engine in Jinja, and the ``render_async()`` and
``generate_async()`` methods.
:param kwargs: any additional options to be passed to Jinja's
``Environment`` class.
"""
global _jinja_env
_jinja_env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(),
enable_async=enable_async,
**kwargs
)
def __init__(self, template): def __init__(self, template):
if _jinja_env is None: # pragma: no cover if _jinja_env is None: # pragma: no cover
init_templates() self.initialize()
#: The name of the template #: The name of the template
self.name = template self.name = template
self.template = _jinja_env.get_template(template) self.template = _jinja_env.get_template(template)

View File

@@ -8,11 +8,11 @@ servers for MicroPython and standard Python.
import asyncio import asyncio
import io import io
import json import json
import re
import time import time
try: try:
from inspect import iscoroutinefunction, iscoroutine from inspect import iscoroutinefunction, iscoroutine
from functools import partial
async def invoke_handler(handler, *args, **kwargs): async def invoke_handler(handler, *args, **kwargs):
"""Invoke a handler and return the result. """Invoke a handler and return the result.
@@ -23,7 +23,7 @@ try:
ret = await handler(*args, **kwargs) ret = await handler(*args, **kwargs)
else: else:
ret = await asyncio.get_running_loop().run_in_executor( ret = await asyncio.get_running_loop().run_in_executor(
None, handler, *args, **kwargs) None, partial(handler, *args, **kwargs))
return ret return ret
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
def iscoroutine(coro): def iscoroutine(coro):
@@ -595,10 +595,10 @@ class Response:
if expires: if expires:
if isinstance(expires, str): if isinstance(expires, str):
http_cookie += '; Expires=' + expires http_cookie += '; Expires=' + expires
else: else: # pragma: no cover
http_cookie += '; Expires=' + time.strftime( http_cookie += '; Expires=' + time.strftime(
'%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple())
if max_age: if max_age is not None:
http_cookie += '; Max-Age=' + str(max_age) http_cookie += '; Max-Age=' + str(max_age)
if secure: if secure:
http_cookie += '; Secure' http_cookie += '; Secure'
@@ -616,10 +616,10 @@ class Response:
:param cookie: The cookie's name. :param cookie: The cookie's name.
:param kwargs: Any cookie opens and flags supported by :param kwargs: Any cookie opens and flags supported by
``set_cookie()`` except ``expires``. ``set_cookie()`` except ``expires`` and ``max_age``.
""" """
self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT',
**kwargs) max_age=0, **kwargs)
def complete(self): def complete(self):
if isinstance(self.body, bytes) and \ if isinstance(self.body, bytes) and \
@@ -774,7 +774,10 @@ class Response:
first. first.
""" """
if content_type is None: if content_type is None:
ext = filename.split('.')[-1] if compressed and filename.endswith('.gz'):
ext = filename[:-3].split('.')[-1]
else:
ext = filename.split('.')[-1]
if ext in Response.types_map: if ext in Response.types_map:
content_type = Response.types_map[ext] content_type = Response.types_map[ext]
else: else:
@@ -797,8 +800,9 @@ class Response:
class URLPattern(): class URLPattern():
def __init__(self, url_pattern): def __init__(self, url_pattern):
self.url_pattern = url_pattern self.url_pattern = url_pattern
self.pattern = '' self.segments = []
self.args = [] self.regex = None
pattern = ''
use_regex = False use_regex = False
for segment in url_pattern.lstrip('/').split('/'): for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<': if segment and segment[0] == '<':
@@ -810,42 +814,83 @@ class URLPattern():
else: else:
type_ = 'string' type_ = 'string'
name = segment name = segment
parser = None
if type_ == 'string': if type_ == 'string':
pattern = '[^/]+' parser = self._string_segment
pattern += '/([^/]+)'
elif type_ == 'int': elif type_ == 'int':
pattern = '-?\\d+' parser = self._int_segment
pattern += '/(-?\\d+)'
elif type_ == 'path': elif type_ == 'path':
pattern = '.+' use_regex = True
pattern += '/(.+)'
elif type_.startswith('re:'): elif type_.startswith('re:'):
pattern = type_[3:] use_regex = True
pattern += '/({pattern})'.format(pattern=type_[3:])
else: else:
raise ValueError('invalid URL segment type') raise ValueError('invalid URL segment type')
use_regex = True self.segments.append({'parser': parser, 'name': name,
self.pattern += '/({pattern})'.format(pattern=pattern) 'type': type_})
self.args.append({'type': type_, 'name': name})
else: else:
self.pattern += '/{segment}'.format(segment=segment) pattern += '/' + segment
self.segments.append({'parser': self._static_segment(segment)})
if use_regex: if use_regex:
self.pattern = re.compile('^' + self.pattern + '$') import re
self.regex = re.compile('^' + pattern + '$')
def match(self, path): def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {} args = {}
i = 1 if self.regex:
for arg in self.args: g = self.regex.match(path)
value = g.group(i) if not g:
if arg['type'] == 'int': return
value = int(value) i = 1
args[arg['name']] = value for segment in self.segments:
i += 1 if 'name' not in segment:
continue
value = g.group(i)
if segment['type'] == 'int':
value = int(value)
args[segment['name']] = value
i += 1
else:
if len(path) == 0 or path[0] != '/':
return
path = path[1:]
args = {}
for segment in self.segments:
if path is None:
return
arg, path = segment['parser'](path)
if arg is None:
return
if 'name' in segment:
args[segment['name']] = arg
if path is not None:
return
return args return args
def _static_segment(self, segment):
def _static(value):
s = value.split('/', 1)
if s[0] == segment:
return '', s[1] if len(s) > 1 else None
return None, None
return _static
def _string_segment(self, value):
s = value.split('/', 1)
if len(s[0]) == 0:
return None, None
return s[0], s[1] if len(s) > 1 else None
def _int_segment(self, value):
s = value.split('/', 1)
try:
return int(s[0]), s[1] if len(s) > 1 else None
except ValueError:
return None, None
class HTTPException(Exception): class HTTPException(Exception):
def __init__(self, status_code, reason=None): def __init__(self, status_code, reason=None):
@@ -1149,7 +1194,7 @@ class Microdot:
Example:: Example::
import asyncio import asyncio
from microdot_asyncio import Microdot from microdot import Microdot
app = Microdot() app = Microdot()
@@ -1226,7 +1271,7 @@ class Microdot:
Example:: Example::
from microdot_asyncio import Microdot from microdot import Microdot
app = Microdot() app = Microdot()
@@ -1324,7 +1369,12 @@ class Microdot:
if res is None: if res is None:
res = await invoke_handler( res = await invoke_handler(
f, req, **req.url_args) f, req, **req.url_args)
if isinstance(res, int):
res = '', res
if isinstance(res, tuple): if isinstance(res, tuple):
if isinstance(res[0], int):
res = ('', res[0],
res[1] if len(res) > 1 else {})
body = res[0] body = res[0]
if isinstance(res[1], int): if isinstance(res[1], int):
status_code = res[1] status_code = res[1]

View File

@@ -1,7 +1,6 @@
import jwt import jwt
from microdot.microdot import invoke_handler from microdot.microdot import invoke_handler
from microdot.helpers import wraps
secret_key = None
class SessionDict(dict): class SessionDict(dict):
@@ -30,14 +29,21 @@ class Session:
""" """
secret_key = None secret_key = None
def __init__(self, app=None, secret_key=None): def __init__(self, app=None, secret_key=None, cookie_options=None):
self.secret_key = secret_key self.secret_key = secret_key
self.cookie_options = cookie_options or {}
if app is not None: if app is not None:
self.initialize(app) self.initialize(app)
def initialize(self, app, secret_key=None): def initialize(self, app, secret_key=None, cookie_options=None):
if secret_key is not None: if secret_key is not None:
self.secret_key = secret_key self.secret_key = secret_key
if cookie_options is not None:
self.cookie_options = cookie_options
if 'path' not in self.cookie_options:
self.cookie_options['path'] = '/'
if 'http_only' not in self.cookie_options:
self.cookie_options['http_only'] = True
app._session = self app._session = self
def get(self, request): def get(self, request):
@@ -57,13 +63,7 @@ class Session:
if session is None: if session is None:
request.g._session = SessionDict(request, {}) request.g._session = SessionDict(request, {})
return request.g._session return request.g._session
try: request.g._session = SessionDict(request, self.decode(session))
session = jwt.decode(session, self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
request.g._session = SessionDict(request, {})
else:
request.g._session = SessionDict(request, session)
return request.g._session return request.g._session
def update(self, request, session): def update(self, request, session):
@@ -89,12 +89,12 @@ class Session:
if not self.secret_key: if not self.secret_key:
raise ValueError('The session secret key is not configured') raise ValueError('The session secret key is not configured')
encoded_session = jwt.encode(session, self.secret_key, encoded_session = self.encode(session)
algorithm='HS256')
@request.after_request @request.after_request
def _update_session(request, response): def _update_session(request, response):
response.set_cookie('session', encoded_session, http_only=True) response.set_cookie('session', encoded_session,
**self.cookie_options)
return response return response
def delete(self, request): def delete(self, request):
@@ -117,10 +117,21 @@ class Session:
""" """
@request.after_request @request.after_request
def _delete_session(request, response): def _delete_session(request, response):
response.set_cookie('session', '', http_only=True, response.delete_cookie('session', **self.cookie_options)
expires='Thu, 01 Jan 1970 00:00:01 GMT')
return response return response
def encode(self, payload, secret_key=None):
return jwt.encode(payload, secret_key or self.secret_key,
algorithm='HS256')
def decode(self, session, secret_key=None):
try:
payload = jwt.decode(session, secret_key or self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
return {}
return payload
def with_session(f): def with_session(f):
"""Decorator that passes the user session to the route handler. """Decorator that passes the user session to the route handler.
@@ -134,15 +145,11 @@ def with_session(f):
return 'Hello, World!' return 'Hello, World!'
Note that the decorator does not save the session. To update the session, Note that the decorator does not save the session. To update the session,
call the :func:`update_session <microdot.session.update_session>` function. call the :func:`session.save() <microdot.session.SessionDict.save>` method.
""" """
@wraps(f)
async def wrapper(request, *args, **kwargs): async def wrapper(request, *args, **kwargs):
return await invoke_handler( return await invoke_handler(
f, request, request.app._session.get(request), *args, **kwargs) f, request, request.app._session.get(request), *args, **kwargs)
for attr in ['__name__', '__doc__', '__module__', '__qualname__']:
try:
setattr(wrapper, attr, getattr(f, attr))
except AttributeError: # pragma: no cover
pass
return wrapper return wrapper

View File

@@ -1,20 +1,40 @@
import asyncio import asyncio
import json import json
from microdot.helpers import wraps
class SSE: class SSE:
"""Server-Sent Events object.
An object of this class is sent to handler functions to manage the SSE
connection.
"""
def __init__(self): def __init__(self):
self.event = asyncio.Event() self.event = asyncio.Event()
self.queue = [] self.queue = []
async def send(self, data, event=None): async def send(self, data, event=None, event_id=None):
"""Send an event to the client.
:param data: the data to send. It can be given as a string, bytes, dict
or list. Dictionaries and lists are serialized to JSON.
Any other types are converted to string before sending.
:param event: an optional event name, to send along with the data. If
given, it must be a string.
:param event_id: an optional event id, to send along with the data. If
given, it must be a string.
"""
if isinstance(data, (dict, list)): if isinstance(data, (dict, list)):
data = json.dumps(data) data = json.dumps(data).encode()
elif not isinstance(data, str): elif isinstance(data, str):
data = str(data) data = data.encode()
data = f'data: {data}\n\n' elif not isinstance(data, bytes):
data = str(data).encode()
data = b'data: ' + data + b'\n\n'
if event_id:
data = b'id: ' + event_id.encode() + b'\n' + data
if event: if event:
data = f'event: {event}\n{data}' data = b'event: ' + event.encode() + b'\n' + data
self.queue.append(data) self.queue.append(data)
self.event.set() self.event.set()
@@ -30,19 +50,9 @@ def sse_response(request, event_function, *args, **kwargs):
:param args: additional positional arguments to be passed to the response. :param args: additional positional arguments to be passed to the response.
:param kwargs: additional keyword arguments to be passed to the response. :param kwargs: additional keyword arguments to be passed to the response.
Example:: This is a low-level function that can be used to implement a custom SSE
endpoint. In general the :func:`microdot.sse.with_sse` decorator should be
@app.route('/events') used instead.
async def events_route(request):
async def events(request, sse):
# send an unnamed event with string data
await sse.send('hello')
# send an unnamed event with JSON data
await sse.send({'foo': 'bar'})
# send a named event
await sse.send('hello', event='greeting')
return sse_response(request, events)
""" """
sse = SSE() sse = SSE()
@@ -85,10 +95,16 @@ def with_sse(f):
@app.route('/events') @app.route('/events')
@with_sse @with_sse
async def events(request, sse): async def events(request, sse):
for i in range(10): # send an unnamed event with string data
await asyncio.sleep(1) await sse.send('hello')
await sse.send(f'{i}')
# send an unnamed event with JSON data
await sse.send({'foo': 'bar'})
# send a named event
await sse.send('hello', event='greeting')
""" """
@wraps(f)
async def sse_handler(request, *args, **kwargs): async def sse_handler(request, *args, **kwargs):
return sse_response(request, f, *args, **kwargs) return sse_response(request, f, *args, **kwargs)

View File

@@ -77,7 +77,7 @@ class TestClient:
The following example shows how to create a test client for an application The following example shows how to create a test client for an application
and send a test request:: and send a test request::
from microdot_asyncio import Microdot from microdot import Microdot
app = Microdot() app = Microdot()
@@ -112,9 +112,13 @@ class TestClient:
headers['Host'] = 'example.com:1234' headers['Host'] = 'example.com:1234'
return body, headers return body, headers
def _process_cookies(self, headers): def _process_cookies(self, path, headers):
cookies = '' cookies = ''
for name, value in self.cookies.items(): for name, value in self.cookies.items():
if isinstance(value, tuple):
value, cookie_path = value
if not path.startswith(cookie_path):
continue
if cookies: if cookies:
cookies += '; ' cookies += '; '
cookies += name + '=' + value cookies += name + '=' + value
@@ -123,7 +127,7 @@ class TestClient:
headers['Cookie'] += '; ' + cookies headers['Cookie'] += '; ' + cookies
else: else:
headers['Cookie'] = cookies headers['Cookie'] = cookies
return cookies, headers return headers
def _render_request(self, method, path, headers, body): def _render_request(self, method, path, headers, body):
request_bytes = '{method} {path} HTTP/1.0\n'.format( request_bytes = '{method} {path} HTTP/1.0\n'.format(
@@ -139,26 +143,45 @@ class TestClient:
for cookie in cookies: for cookie in cookies:
cookie_name, cookie_value = cookie.split('=', 1) cookie_name, cookie_value = cookie.split('=', 1)
cookie_options = cookie_value.split(';') cookie_options = cookie_value.split(';')
path = '/'
delete = False delete = False
for option in cookie_options[1:]: for option in cookie_options[1:]:
if option.strip().lower().startswith('expires='): option = option.strip().lower()
_, e = option.strip().split('=', 1) if option.startswith(
'max-age='): # pragma: no cover
_, age = option.split('=', 1)
try:
age = int(age)
except ValueError: # pragma: no cover
age = 0
if age <= 0:
delete = True
elif option.startswith('expires='):
_, e = option.split('=', 1)
# this is a very limited parser for cookie expiry # this is a very limited parser for cookie expiry
# that only detects a cookie deletion request when # that only detects a cookie deletion request when
# the date is 1/1/1970 # the date is 1/1/1970
if '1 jan 1970' in e.lower(): # pragma: no branch if '1 jan 1970' in e.lower(): # pragma: no branch
delete = True delete = True
break elif option.startswith('path='):
_, path = option.split('=', 1)
if delete: if delete:
if cookie_name in self.cookies: # pragma: no branch if cookie_name in self.cookies: # pragma: no branch
del self.cookies[cookie_name] cookie_path = self.cookies[cookie_name][1] \
if isinstance(self.cookies[cookie_name], tuple) \
else '/'
if path == cookie_path:
del self.cookies[cookie_name]
else: else:
self.cookies[cookie_name] = cookie_options[0] if path == '/':
self.cookies[cookie_name] = cookie_options[0]
else:
self.cookies[cookie_name] = (cookie_options[0], path)
async def request(self, method, path, headers=None, body=None, sock=None): async def request(self, method, path, headers=None, body=None, sock=None):
headers = headers or {} headers = headers or {}
body, headers = self._process_body(body, headers) body, headers = self._process_body(body, headers)
cookies, headers = self._process_cookies(headers) headers = self._process_cookies(path, headers)
request_bytes = self._render_request(method, path, headers, body) request_bytes = self._render_request(method, path, headers, body)
if sock: if sock:
reader = sock[0] reader = sock[0]
@@ -292,6 +315,8 @@ class TestClient:
async def awrite(self, data): async def awrite(self, data):
if self.started: if self.started:
h = WebSocket._parse_frame_header(data[0:2]) h = WebSocket._parse_frame_header(data[0:2])
if h[1] not in [WebSocket.TEXT, WebSocket.BINARY]:
return
if h[3] < 0: if h[3] < 0:
data = data[2 - h[3]:] data = data[2 - h[3]:]
else: else:

View File

@@ -3,30 +3,32 @@ from utemplate import recompile
_loader = None _loader = None
def init_templates(template_dir='templates', loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load templates
from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default is
the ``recompile.Loader`` class, which automatically
recompiles templates when they change.
"""
global _loader
_loader = loader_class(None, template_dir)
class Template: class Template:
"""A template object. """A template object.
:param template: The filename of the template to render, relative to the :param template: The filename of the template to render, relative to the
configured template directory. configured template directory.
""" """
@classmethod
def initialize(cls, template_dir='templates',
loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default
is the ``recompile.Loader`` class, which
automatically recompiles templates when they
change.
"""
global _loader
_loader = loader_class(None, template_dir)
def __init__(self, template): def __init__(self, template):
if _loader is None: # pragma: no cover if _loader is None: # pragma: no cover
init_templates() self.initialize()
#: The name of the template #: The name of the template
self.name = template self.name = template
self.template = _loader.load(template) self.template = _loader.load(template)

View File

@@ -1,10 +1,21 @@
import binascii import binascii
import hashlib import hashlib
from microdot import Response from microdot import Request, Response
from microdot.microdot import MUTED_SOCKET_ERRORS from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception
from microdot.helpers import wraps
class WebSocketError(Exception):
"""Exception raised when an error occurs in a WebSocket connection."""
pass
class WebSocket: class WebSocket:
"""A WebSocket connection object.
An instance of this class is sent to handler functions to manage the
WebSocket connection.
"""
CONT = 0 CONT = 0
TEXT = 1 TEXT = 1
BINARY = 2 BINARY = 2
@@ -12,6 +23,18 @@ class WebSocket:
PING = 9 PING = 9
PONG = 10 PONG = 10
#: Specify the maximum message size that can be received when calling the
#: ``receive()`` method. Messages with payloads that are larger than this
#: size will be rejected and the connection closed. Set to 0 to disable
#: the size check (be aware of potential security issues if you do this),
#: or to -1 to use the value set in
#: ``Request.max_body_length``. The default is -1.
#:
#: Example::
#:
#: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages
max_message_length = -1
def __init__(self, request): def __init__(self, request):
self.request = request self.request = request
self.closed = False self.closed = False
@@ -26,6 +49,7 @@ class WebSocket:
b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n') b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
async def receive(self): async def receive(self):
"""Receive a message from the client."""
while True: while True:
opcode, payload = await self._read_frame() opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload) send_opcode, data = self._process_websocket_frame(opcode, payload)
@@ -35,12 +59,20 @@ class WebSocket:
return data return data
async def send(self, data, opcode=None): async def send(self, data, opcode=None):
"""Send a message to the client.
:param data: the data to send, given as a string or bytes.
:param opcode: a custom frame opcode to use. If not given, the opcode
is ``TEXT`` or ``BINARY`` depending on the type of the
data.
"""
frame = self._encode_websocket_frame( frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY), opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
data) data)
await self.request.sock[1].awrite(frame) await self.request.sock[1].awrite(frame)
async def close(self): async def close(self):
"""Close the websocket connection."""
if not self.closed: # pragma: no cover if not self.closed: # pragma: no cover
self.closed = True self.closed = True
await self.send(b'', self.CLOSE) await self.send(b'', self.CLOSE)
@@ -72,7 +104,7 @@ class WebSocket:
fin = header[0] & 0x80 fin = header[0] & 0x80
opcode = header[0] & 0x0f opcode = header[0] & 0x0f
if fin == 0 or opcode == cls.CONT: # pragma: no cover if fin == 0 or opcode == cls.CONT: # pragma: no cover
raise OSError(32, 'Continuation frames not supported') raise WebSocketError('Continuation frames not supported')
has_mask = header[1] & 0x80 has_mask = header[1] & 0x80
length = header[1] & 0x7f length = header[1] & 0x7f
if length == 126: if length == 126:
@@ -87,7 +119,7 @@ class WebSocket:
elif opcode == self.BINARY: elif opcode == self.BINARY:
pass pass
elif opcode == self.CLOSE: elif opcode == self.CLOSE:
raise OSError(32, 'Websocket connection closed') raise WebSocketError('Websocket connection closed')
elif opcode == self.PING: elif opcode == self.PING:
return self.PONG, payload return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch elif opcode == self.PONG: # pragma: no branch
@@ -114,7 +146,7 @@ class WebSocket:
async def _read_frame(self): async def _read_frame(self):
header = await self.request.sock[0].read(2) header = await self.request.sock[0].read(2)
if len(header) != 2: # pragma: no cover if len(header) != 2: # pragma: no cover
raise OSError(32, 'Websocket connection closed') raise WebSocketError('Websocket connection closed')
fin, opcode, has_mask, length = self._parse_frame_header(header) fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == -2: if length == -2:
length = await self.request.sock[0].read(2) length = await self.request.sock[0].read(2)
@@ -122,6 +154,10 @@ class WebSocket:
elif length == -8: elif length == -8:
length = await self.request.sock[0].read(8) length = await self.request.sock[0].read(8)
length = int.from_bytes(length, 'big') length = int.from_bytes(length, 'big')
max_allowed_length = Request.max_body_length \
if self.max_message_length == -1 else self.max_message_length
if length > max_allowed_length:
raise WebSocketError('Message too large')
if has_mask: # pragma: no cover if has_mask: # pragma: no cover
mask = await self.request.sock[0].read(4) mask = await self.request.sock[0].read(4)
payload = await self.request.sock[0].read(length) payload = await self.request.sock[0].read(length)
@@ -157,15 +193,24 @@ async def websocket_upgrade(request):
def websocket_wrapper(f, upgrade_function): def websocket_wrapper(f, upgrade_function):
@wraps(f)
async def wrapper(request, *args, **kwargs): async def wrapper(request, *args, **kwargs):
ws = await upgrade_function(request) ws = await upgrade_function(request)
try: try:
await f(request, ws, *args, **kwargs) await f(request, ws, *args, **kwargs)
await ws.close() # pragma: no cover
except OSError as exc: except OSError as exc:
if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover
raise raise
return '' except WebSocketError:
pass
except Exception as exc:
print_exception(exc)
finally: # pragma: no cover
try:
await ws.close()
except Exception:
pass
return Response.already_handled
return wrapper return wrapper

View File

@@ -9,6 +9,12 @@ from microdot.websocket import WebSocket, websocket_upgrade, \
class Microdot(BaseMicrodot): class Microdot(BaseMicrodot):
"""A subclass of the core :class:`Microdot <microdot.Microdot>` class that
implements the WSGI protocol.
This class must be used as the application instance when running under a
WSGI web server.
"""
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()

1
tests/files/test.txt.gz Normal file
View File

@@ -0,0 +1 @@
foo

View File

@@ -2,10 +2,10 @@ import asyncio
import sys import sys
import unittest import unittest
from microdot import Microdot from microdot import Microdot
from microdot.jinja import Template, init_templates from microdot.jinja import Template
from microdot.test_client import TestClient from microdot.test_client import TestClient
init_templates('tests/templates') Template.initialize('tests/templates')
@unittest.skipIf(sys.implementation.name == 'micropython', @unittest.skipIf(sys.implementation.name == 'micropython',
@@ -49,7 +49,7 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.body, b'Hello, foo!') self.assertEqual(res.body, b'Hello, foo!')
def test_render_async_template_in_app(self): def test_render_async_template_in_app(self):
init_templates('tests/templates', enable_async=True) Template.initialize('tests/templates', enable_async=True)
app = Microdot() app = Microdot()
@@ -62,10 +62,10 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.status_code, 200) self.assertEqual(res.status_code, 200)
self.assertEqual(res.body, b'Hello, foo!') self.assertEqual(res.body, b'Hello, foo!')
init_templates('tests/templates') Template.initialize('tests/templates')
def test_generate_async_template_in_app(self): def test_generate_async_template_in_app(self):
init_templates('tests/templates', enable_async=True) Template.initialize('tests/templates', enable_async=True)
app = Microdot() app = Microdot()
@@ -78,4 +78,4 @@ class TestJinja(unittest.TestCase):
self.assertEqual(res.status_code, 200) self.assertEqual(res.status_code, 200)
self.assertEqual(res.body, b'Hello, foo!') self.assertEqual(res.body, b'Hello, foo!')
init_templates('tests/templates') Template.initialize('tests/templates')

View File

@@ -25,6 +25,14 @@ class TestMicrodot(unittest.TestCase):
async def index2(req): async def index2(req):
return 'foo-async' return 'foo-async'
@app.route('/arg/<id>')
def index3(req, id):
return id
@app.route('/arg/async/<id>')
async def index4(req, id):
return f'async-{id}'
client = TestClient(app) client = TestClient(app)
res = self._run(client.get('/')) res = self._run(client.get('/'))
@@ -45,6 +53,24 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.body, b'foo-async') self.assertEqual(res.body, b'foo-async')
self.assertEqual(res.json, None) self.assertEqual(res.json, None)
res = self._run(client.get('/arg/123'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Content-Length'], '3')
self.assertEqual(res.text, '123')
self.assertEqual(res.body, b'123')
self.assertEqual(res.json, None)
res = self._run(client.get('/arg/async/123'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.headers['Content-Length'], '9')
self.assertEqual(res.text, 'async-123')
self.assertEqual(res.body, b'async-123')
self.assertEqual(res.json, None)
def test_post_request(self): def test_post_request(self):
app = Microdot() app = Microdot()
@@ -177,6 +203,7 @@ class TestMicrodot(unittest.TestCase):
req.cookies['one'] + req.cookies['two'] + req.cookies['three']) req.cookies['one'] + req.cookies['two'] + req.cookies['three'])
res.set_cookie('four', '4') res.set_cookie('four', '4')
res.delete_cookie('two', path='/') res.delete_cookie('two', path='/')
res.delete_cookie('one', path='/bad')
return res return res
client = TestClient(app, cookies={'one': '1', 'two': '2'}) client = TestClient(app, cookies={'one': '1', 'two': '2'})
@@ -247,6 +274,14 @@ class TestMicrodot(unittest.TestCase):
return '<p>four</p>', 202, \ return '<p>four</p>', 202, \
{'Content-Type': 'text/html; charset=UTF-8'} {'Content-Type': 'text/html; charset=UTF-8'}
@app.route('/status')
def five(req):
return 202
@app.route('/status-headers')
def six(req):
return 202, {'Content-Type': 'text/html; charset=UTF-8'}
client = TestClient(app) client = TestClient(app)
res = self._run(client.get('/body')) res = self._run(client.get('/body'))
@@ -272,6 +307,18 @@ class TestMicrodot(unittest.TestCase):
'text/html; charset=UTF-8') 'text/html; charset=UTF-8')
self.assertEqual(res.text, '<p>four</p>') self.assertEqual(res.text, '<p>four</p>')
res = self._run(client.get('/status'))
self.assertEqual(res.text, '')
self.assertEqual(res.status_code, 202)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
res = self._run(client.get('/status-headers'))
self.assertEqual(res.text, '')
self.assertEqual(res.status_code, 202)
self.assertEqual(res.headers['Content-Type'],
'text/html; charset=UTF-8')
def test_before_after_request(self): def test_before_after_request(self):
app = Microdot() app = Microdot()

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
from datetime import datetime
import unittest import unittest
from microdot import Response from microdot import Response
from tests.mock_socket import FakeStreamAsync from tests.mock_socket import FakeStreamAsync
@@ -137,10 +136,10 @@ class TestResponse(unittest.TestCase):
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoobar')) self.assertTrue(fd.response.endswith(b'\r\n\r\nfoobar'))
def test_create_from_other(self): def test_create_from_other(self):
res = Response(123) res = Response(23.7)
self.assertEqual(res.status_code, 200) self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {}) self.assertEqual(res.headers, {})
self.assertEqual(res.body, 123) self.assertEqual(res.body, 23.7)
def test_create_with_status_code(self): def test_create_with_status_code(self):
res = Response('not found', 404) res = Response('not found', 404)
@@ -186,14 +185,15 @@ class TestResponse(unittest.TestCase):
res.set_cookie('foo2', 'bar2', path='/', partitioned=True) res.set_cookie('foo2', 'bar2', path='/', partitioned=True)
res.set_cookie('foo3', 'bar3', domain='example.com:1234') res.set_cookie('foo3', 'bar3', domain='example.com:1234')
res.set_cookie('foo4', 'bar4', res.set_cookie('foo4', 'bar4',
expires=datetime(2019, 11, 5, 2, 23, 54)) expires='Tue, 05 Nov 2019 02:23:54 GMT')
res.set_cookie('foo5', 'bar5', max_age=123, res.set_cookie('foo5', 'bar5', max_age=123,
expires='Thu, 01 Jan 1970 00:00:00 GMT') expires='Thu, 01 Jan 1970 00:00:00 GMT')
res.set_cookie('foo6', 'bar6', secure=True, http_only=True) res.set_cookie('foo6', 'bar6', secure=True, http_only=True)
res.set_cookie('foo7', 'bar7', path='/foo', domain='example.com:1234', res.set_cookie('foo7', 'bar7', path='/foo', domain='example.com:1234',
expires=datetime(2019, 11, 5, 2, 23, 54), max_age=123, expires='Tue, 05 Nov 2019 02:23:54 GMT', max_age=123,
secure=True, http_only=True) secure=True, http_only=True)
res.delete_cookie('foo8', http_only=True) res.delete_cookie('foo8', http_only=True)
res.delete_cookie('foo9', path='/s')
self.assertEqual(res.headers, {'Set-Cookie': [ self.assertEqual(res.headers, {'Set-Cookie': [
'foo1=bar1', 'foo1=bar1',
'foo2=bar2; Path=/; Partitioned', 'foo2=bar2; Path=/; Partitioned',
@@ -204,7 +204,10 @@ class TestResponse(unittest.TestCase):
'foo7=bar7; Path=/foo; Domain=example.com:1234; ' 'foo7=bar7; Path=/foo; Domain=example.com:1234; '
'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; ' 'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; '
'HttpOnly', 'HttpOnly',
'foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; HttpOnly', ('foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; Max-Age=0; '
'HttpOnly'),
('foo9=; Path=/s; Expires=Thu, 01 Jan 1970 00:00:01 GMT; '
'Max-Age=0'),
]}) ]})
def test_redirect(self): def test_redirect(self):
@@ -277,6 +280,17 @@ class TestResponse(unittest.TestCase):
'application/octet-stream') 'application/octet-stream')
self.assertEqual(res.headers['Content-Encoding'], 'gzip') self.assertEqual(res.headers['Content-Encoding'], 'gzip')
def test_send_file_gzip_handling(self):
res = Response.send_file('tests/files/test.txt.gz')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'application/octet-stream')
res = Response.send_file('tests/files/test.txt.gz', compressed=True)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/plain')
self.assertEqual(res.headers['Content-Encoding'], 'gzip')
def test_default_content_type(self): def test_default_content_type(self):
original_content_type = Response.default_content_type original_content_type = Response.default_content_type
res = Response('foo') res = Response('foo')

View File

@@ -82,3 +82,77 @@ class TestSession(unittest.TestCase):
res = self._run(client.get('/')) res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200) self.assertEqual(res.status_code, 200)
def test_session_default_path(self):
app = Microdot()
Session(app, secret_key='some-other-secret')
client = TestClient(app)
@app.get('/')
@with_session
def index(req, session):
session['foo'] = 'bar'
session.save()
return ''
@app.get('/child')
@with_session
def child(req, session):
return str(session.get('foo'))
@app.get('/delete')
@with_session
def delete(req, session):
session.delete()
return ''
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/child'))
self.assertEqual(res.text, 'bar')
res = self._run(client.get('/delete'))
res = self._run(client.get('/child'))
self.assertEqual(res.text, 'None')
def test_session_custom_path(self):
app = Microdot()
session_ext = Session()
session_ext.initialize(app, secret_key='some-other-secret',
cookie_options={'path': '/child',
'http_only': False})
client = TestClient(app)
@app.get('/')
@with_session
def index(req, session):
return str(session.get('foo'))
@app.get('/child')
@with_session
def child(req, session):
session['foo'] = 'bar'
session.save()
return ''
@app.get('/child/foo')
@with_session
def foo(req, session):
return str(session.get('foo'))
@app.get('/child/delete')
@with_session
def delete(req, session):
session.delete()
return ''
res = self._run(client.get('/child'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/child/foo'))
self.assertEqual(res.text, 'bar')
res = self._run(client.get('/child/delete'))
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/child/foo'))
self.assertEqual(res.text, 'None')

View File

@@ -23,9 +23,12 @@ class TestWebSocket(unittest.TestCase):
async def handle_sse(request, sse): async def handle_sse(request, sse):
await sse.send('foo') await sse.send('foo')
await sse.send('bar', event='test') await sse.send('bar', event='test')
await sse.send('bar', event='test', event_id='id42')
await sse.send('bar', event_id='id42')
await sse.send({'foo': 'bar'}) await sse.send({'foo': 'bar'})
await sse.send([42, 'foo', 'bar']) await sse.send([42, 'foo', 'bar'])
await sse.send(ValueError('foo')) await sse.send(ValueError('foo'))
await sse.send(b'foo')
client = TestClient(app) client = TestClient(app)
response = self._run(client.get('/sse')) response = self._run(client.get('/sse'))
@@ -33,6 +36,9 @@ class TestWebSocket(unittest.TestCase):
self.assertEqual(response.headers['Content-Type'], 'text/event-stream') self.assertEqual(response.headers['Content-Type'], 'text/event-stream')
self.assertEqual(response.text, ('data: foo\n\n' self.assertEqual(response.text, ('data: foo\n\n'
'event: test\ndata: bar\n\n' 'event: test\ndata: bar\n\n'
'event: test\nid: id42\ndata: bar\n\n'
'id: id42\ndata: bar\n\n'
'data: {"foo": "bar"}\n\n' 'data: {"foo": "bar"}\n\n'
'data: [42, "foo", "bar"]\n\n' 'data: [42, "foo", "bar"]\n\n'
'data: foo\n\n'
'data: foo\n\n')) 'data: foo\n\n'))

View File

@@ -7,11 +7,14 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/') p = URLPattern('/')
self.assertEqual(p.match('/'), {}) self.assertEqual(p.match('/'), {})
self.assertIsNone(p.match('/foo')) self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('foo'))
self.assertIsNone(p.match(''))
p = URLPattern('/foo/bar') p = URLPattern('/foo/bar')
self.assertEqual(p.match('/foo/bar'), {}) self.assertEqual(p.match('/foo/bar'), {})
self.assertIsNone(p.match('/foo')) self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar/')) self.assertIsNone(p.match('/foo/bar/'))
self.assertIsNone(p.match('/foo/bar/baz'))
p = URLPattern('/foo//bar/baz/') p = URLPattern('/foo//bar/baz/')
self.assertEqual(p.match('/foo//bar/baz/'), {}) self.assertEqual(p.match('/foo//bar/baz/'), {})
@@ -23,32 +26,50 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/<arg>') p = URLPattern('/<arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'}) self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/')) self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('//'))
self.assertIsNone(p.match(''))
self.assertIsNone(p.match('foo/'))
self.assertIsNone(p.match('/foo/')) self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('//foo/'))
self.assertIsNone(p.match('/foo//'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo//bar'))
p = URLPattern('/<arg>/') p = URLPattern('/<arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'}) self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/')) self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo')) self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/<string:arg>') p = URLPattern('/<string:arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'}) self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/')) self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/')) self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/<string:arg>/') p = URLPattern('/<string:arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'}) self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/')) self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo')) self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/foo/<arg1>/bar/<arg2>') p = URLPattern('/foo/<arg1>/bar/<arg2>')
self.assertEqual(p.match('/foo/one/bar/two'), self.assertEqual(p.match('/foo/one/bar/two'),
{'arg1': 'one', 'arg2': 'two'}) {'arg1': 'one', 'arg2': 'two'})
self.assertIsNone(p.match('/')) self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/')) self.assertIsNone(p.match('/foo/'))
self.assertIsNone(p.match('/foo/bar'))
self.assertIsNone(p.match('/foo//bar/'))
self.assertIsNone(p.match('/foo//bar//'))
def test_int_argument(self): def test_int_argument(self):
p = URLPattern('/users/<int:id>') p = URLPattern('/users/<int:id>')
self.assertEqual(p.match('/users/123'), {'id': 123}) self.assertEqual(p.match('/users/123'), {'id': 123})
self.assertEqual(p.match('/users/-123'), {'id': -123})
self.assertEqual(p.match('/users/0'), {'id': 0})
self.assertIsNone(p.match('/users/')) self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/abc')) self.assertIsNone(p.match('/users/abc'))
self.assertIsNone(p.match('/users/123abc')) self.assertIsNone(p.match('/users/123abc'))
@@ -82,7 +103,10 @@ class TestURLPattern(unittest.TestCase):
p = URLPattern('/users/<re:[a-c]+:id>') p = URLPattern('/users/<re:[a-c]+:id>')
self.assertEqual(p.match('/users/ab'), {'id': 'ab'}) self.assertEqual(p.match('/users/ab'), {'id': 'ab'})
self.assertEqual(p.match('/users/bca'), {'id': 'bca'}) self.assertEqual(p.match('/users/bca'), {'id': 'bca'})
self.assertIsNone(p.match('/users'))
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/abcd')) self.assertIsNone(p.match('/users/abcd'))
self.assertIsNone(p.match('/users/abcdx'))
def test_many_arguments(self): def test_many_arguments(self):
p = URLPattern('/foo/<path:path>/<int:id>/bar/<name>') p = URLPattern('/foo/<path:path>/<int:id>/bar/<name>')

View File

@@ -2,9 +2,9 @@ import asyncio
import unittest import unittest
from microdot import Microdot from microdot import Microdot
from microdot.test_client import TestClient from microdot.test_client import TestClient
from microdot.utemplate import Template, init_templates from microdot.utemplate import Template
init_templates('tests/templates') Template.initialize('tests/templates')
class TestUTemplate(unittest.TestCase): class TestUTemplate(unittest.TestCase):

View File

@@ -1,8 +1,8 @@
import asyncio import asyncio
import sys import sys
import unittest import unittest
from microdot import Microdot from microdot import Microdot, Request
from microdot.websocket import with_websocket, WebSocket from microdot.websocket import with_websocket, WebSocket, WebSocketError
from microdot.test_client import TestClient from microdot.test_client import TestClient
@@ -17,6 +17,7 @@ class TestWebSocket(unittest.TestCase):
return self.loop.run_until_complete(coro) return self.loop.run_until_complete(coro)
def test_websocket_echo(self): def test_websocket_echo(self):
WebSocket.max_message_length = 65537
app = Microdot() app = Microdot()
@app.route('/echo') @app.route('/echo')
@@ -26,34 +27,10 @@ class TestWebSocket(unittest.TestCase):
data = await ws.receive() data = await ws.receive()
await ws.send(data) await ws.send(data)
results = [] @app.route('/divzero')
def ws():
data = yield 'hello'
results.append(data)
data = yield b'bye'
results.append(data)
data = yield b'*' * 300
results.append(data)
data = yield b'+' * 65537
results.append(data)
client = TestClient(app)
res = self._run(client.websocket('/echo', ws))
self.assertIsNone(res)
self.assertEqual(results, ['hello', b'bye', b'*' * 300, b'+' * 65537])
@unittest.skipIf(sys.implementation.name == 'micropython',
'no support for async generators in MicroPython')
def test_websocket_echo_async_client(self):
app = Microdot()
@app.route('/echo')
@with_websocket @with_websocket
async def index(req, ws): async def divzero(req, ws):
while True: 1 / 0
data = await ws.receive()
await ws.send(data)
results = [] results = []
@@ -72,6 +49,35 @@ class TestWebSocket(unittest.TestCase):
self.assertIsNone(res) self.assertIsNone(res)
self.assertEqual(results, ['hello', b'bye', b'*' * 300, b'+' * 65537]) self.assertEqual(results, ['hello', b'bye', b'*' * 300, b'+' * 65537])
res = self._run(client.websocket('/divzero', ws))
self.assertIsNone(res)
WebSocket.max_message_length = -1
@unittest.skipIf(sys.implementation.name == 'micropython',
'no support for async generators in MicroPython')
def test_websocket_large_message(self):
saved_max_body_length = Request.max_body_length
Request.max_body_length = 10
app = Microdot()
@app.route('/echo')
@with_websocket
async def index(req, ws):
data = await ws.receive()
await ws.send(data)
results = []
async def ws():
data = yield '0123456789abcdef'
results.append(data)
client = TestClient(app)
res = self._run(client.websocket('/echo', ws))
self.assertIsNone(res)
self.assertEqual(results, [])
Request.max_body_length = saved_max_body_length
def test_bad_websocket_request(self): def test_bad_websocket_request(self):
app = Microdot() app = Microdot()
@@ -106,7 +112,7 @@ class TestWebSocket(unittest.TestCase):
(None, 'foo')) (None, 'foo'))
self.assertEqual(ws._process_websocket_frame(WebSocket.BINARY, b'foo'), self.assertEqual(ws._process_websocket_frame(WebSocket.BINARY, b'foo'),
(None, b'foo')) (None, b'foo'))
self.assertRaises(OSError, ws._process_websocket_frame, self.assertRaises(WebSocketError, ws._process_websocket_frame,
WebSocket.CLOSE, b'') WebSocket.CLOSE, b'')
self.assertEqual(ws._process_websocket_frame(WebSocket.PING, b'foo'), self.assertEqual(ws._process_websocket_frame(WebSocket.PING, b'foo'),
(WebSocket.PONG, b'foo')) (WebSocket.PONG, b'foo'))

View File

@@ -1,23 +1,24 @@
FROM ubuntu:22.04 FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
ARG VERSION=master
ENV VERSION=$VERSION
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python3 && \ apt-get install -y build-essential libffi-dev git pkg-config python3 && \
rm -rf /var/lib/apt/lists/* && \ rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/micropython/micropython.git && \ git clone https://github.com/micropython/micropython.git && \
cd micropython && \ cd micropython && \
git checkout $VERSION && \
git submodule update --init && \ git submodule update --init && \
cd mpy-cross && \ cd mpy-cross && \
make && \ make && \
cd .. && \ cd .. && \
cd ports/unix && \ cd ports/unix && \
make && \ make && \
make test && \
make install && \ make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python3 && \ apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python3 && \
cd ../../.. && \ cd ../../.. && \
rm -rf micropython rm -rf micropython
CMD ["/usr/local/bin/micropython"] CMD ["/usr/local/bin/micropython"]

View File

@@ -0,0 +1,24 @@
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
ARG VERSION=main
ENV VERSION=$VERSION
RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python3 && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/adafruit/circuitpython.git && \
cd circuitpython && \
git checkout $VERSION && \
git submodule update --init lib tools frozen && \
cd mpy-cross && \
make && \
cd .. && \
cd ports/unix && \
make && \
make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python3 && \
cd ../../.. && \
rm -rf circuitpython
CMD ["/usr/local/bin/micropython"]

11
tools/update-circuitpython.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
# this script updates the micropython binary in the /bin directory that is
# used to run unit tests under GitHub Actions builds
DOCKER=${DOCKER:-docker}
VERSION=${1:-main}
$DOCKER build -f Dockerfile.circuitpython --build-arg VERSION=$VERSION -t circuitpython .
$DOCKER create -t --name dummy-circuitpython circuitpython
$DOCKER cp dummy-circuitpython:/usr/local/bin/micropython ../bin/circuitpython
$DOCKER rm dummy-circuitpython

View File

@@ -3,8 +3,9 @@
# used to run unit tests under GitHub Actions builds # used to run unit tests under GitHub Actions builds
DOCKER=${DOCKER:-docker} DOCKER=${DOCKER:-docker}
VERSION=${1:-master}
$DOCKER build -t micropython . $DOCKER build --build-arg VERSION=$VERSION -t micropython .
$DOCKER create -it --name dummy-micropython micropython $DOCKER create -it --name dummy-micropython micropython
$DOCKER cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython $DOCKER cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
$DOCKER rm dummy-micropython $DOCKER rm dummy-micropython

17
tox.ini
View File

@@ -1,5 +1,5 @@
[tox] [tox]
envlist=flake8,py38,py39,py310,py311,py312,upy,benchmark envlist=flake8,py38,py39,py310,py311,py312,upy,cpy,benchmark,docs
skipsdist=True skipsdist=True
skip_missing_interpreters=True skip_missing_interpreters=True
@@ -29,10 +29,13 @@ setenv=
allowlist_externals=sh allowlist_externals=sh
commands=sh -c "bin/micropython run_tests.py" commands=sh -c "bin/micropython run_tests.py"
[testenv:cpy]
allowlist_externals=sh
commands=sh -c "bin/circuitpython run_tests.py"
[testenv:upy-mac] [testenv:upy-mac]
allowlist_externals=micropython allowlist_externals=micropython
commands=micropython run_tests.py commands=micropython run_tests.py
deps=
[testenv:benchmark] [testenv:benchmark]
deps= deps=
@@ -55,3 +58,13 @@ deps=
flake8 flake8
commands= commands=
flake8 --ignore=W503 --exclude examples/templates/utemplate/templates src tests examples flake8 --ignore=W503 --exclude examples/templates/utemplate/templates src tests examples
[testenv:docs]
changedir=docs
deps=
sphinx
pyjwt
allowlist_externals=
make
commands=
make html