58 Commits

Author SHA1 Message Date
Miguel Grinberg
aac022ba43 Release 0.9.0 2022-06-04 16:48:03 +01:00
Miguel Grinberg
c18ccccb8e Run linter on examples 2022-06-04 16:41:40 +01:00
Miguel Grinberg
bcbad51675 Documentation updates 2022-06-04 16:41:02 +01:00
Miguel Grinberg
d71665fd38 Stream responses (Fixes #44) 2022-06-04 15:56:13 +01:00
Miguel Grinberg
4182ba6380 Uvicorn support for ASGI implementation 2022-06-04 15:08:30 +01:00
Miguel Grinberg
5b5eb907d8 Add Python 3.10 to build 2022-05-26 10:50:55 +01:00
Miguel Grinberg
71009b4978 Return 204 when view function returns None 2022-05-26 10:50:55 +01:00
Miguel Grinberg
35c72125a0 Make body_iter async generator compatible with MicroPython 2022-05-26 10:50:55 +01:00
Miguel Grinberg
7e8ecb1997 ASGI support 2022-05-25 23:47:37 +01:00
Miguel Grinberg
1ae51ccdf7 WSGI support 2022-05-25 00:31:18 +01:00
Miguel Grinberg
0ca1e01e00 Version 0.8.3.dev0 2022-04-20 10:15:24 +01:00
Miguel Grinberg
5f7efcc3f8 Release 0.8.2 2022-04-20 10:15:17 +01:00
Mark Blakeney
0f278321c8 Remove stray/debug remnant print() (#38) 2022-04-20 10:13:38 +01:00
Miguel Grinberg
acf20cc20c Version 0.8.2.dev0 2022-03-18 23:51:38 +00:00
Miguel Grinberg
453e133cc2 Release 0.8.1 2022-03-18 23:51:28 +00:00
Miguel Grinberg
29a9f6f46c Optimizations for request streams and bodies 2022-02-21 18:11:19 +01:00
Miguel Grinberg
9d3222ae4b Version 0.8.1.dev0 2022-02-18 17:41:16 +00:00
Miguel Grinberg
f23a6be2db Release 0.8.0 2022-02-18 17:40:59 +00:00
Miguel Grinberg
992fa722c1 Support streamed request payloads (Fixes #26) 2022-02-18 17:32:14 +00:00
Steve Li
e16fb94b2d Use case insensitive comparisons for HTTP headers (#33) 2022-01-31 12:10:23 +00:00
Miguel Grinberg
c130d8f2d4 simplified hello_async.py example 2022-01-22 23:23:31 +00:00
Miguel Grinberg
bd82c4deab More robust logic to read request body (Fixes #31) 2021-10-23 19:03:31 +01:00
Miguel Grinberg
7bc5d724f0 Version 0.7.3.dev0 2021-09-28 17:23:17 +01:00
Miguel Grinberg
f23c78533e Release 0.7.2 2021-09-28 17:21:05 +01:00
Miguel Grinberg
d29ed6aaa1 Document a security risk in the send_file function 2021-09-28 17:15:07 +01:00
Miguel Grinberg
8e5fb92ff1 Validate redirect URLs 2021-09-28 17:12:15 +01:00
Miguel Grinberg
06015934b8 Return a 400 error when request object could not be created 2021-09-28 17:09:02 +01:00
Miguel Grinberg
568cd51fd2 Version 0.7.2.dev0 2021-09-27 23:01:20 +01:00
Miguel Grinberg
2fe9793389 Release 0.7.1 2021-09-27 22:58:32 +01:00
Miguel Grinberg
de9c991a9a Limit the size of each request line 2021-09-27 20:03:18 +01:00
Miguel Grinberg
d75449eb32 Version 0.7.1.dev0 2021-09-27 17:14:48 +01:00
Miguel Grinberg
e508abc333 Release 0.7.0 2021-09-27 17:12:42 +01:00
Miguel Grinberg
5003a5b3d9 Limit the size of the request body 2021-09-27 17:01:43 +01:00
Miguel Grinberg
4ed101dfc6 Add security policy 2021-09-27 13:57:04 +01:00
Mark Blakeney
833fecb105 Add documentation for request.client_addr (#27) 2021-09-22 12:04:28 +01:00
Miguel Grinberg
d527bdb7c3 Added documentation for reason argument in the Response object 2021-08-11 12:00:46 +01:00
Miguel Grinberg
2516b296a7 Version 0.6.1.dev0 2021-08-11 10:37:04 +01:00
Miguel Grinberg
5061145f5c Release 0.6.0 2021-08-11 10:36:42 +01:00
Miguel Grinberg
122c638bae Fix codecov badge link #nolog 2021-08-11 10:33:10 +01:00
Miguel Grinberg
bd74bcab74 Accept a custom reason phrase for the HTTP response (Fixes #25) 2021-08-11 10:29:08 +01:00
Miguel Grinberg
5cd3ace516 More unit tests 2021-08-02 15:53:13 +01:00
Miguel Grinberg
da32f23e35 Better handling of content types in form and json methods (Fixes #24) 2021-08-02 15:39:32 +01:00
Mark Blakeney
0641466faa Copy client headers to avoid write back (#23) 2021-07-28 10:43:54 +01:00
Miguel Grinberg
dd3fc20507 Make mime type check for form submissions more robust 2021-06-06 20:05:32 +01:00
Miguel Grinberg
46963ba464 Work around a bug in uasyncio's create_server() function 2021-06-06 20:05:12 +01:00
Miguel Grinberg
1a8db51cb3 Installation instructions 2021-06-06 12:24:09 +01:00
Miguel Grinberg
d903c42370 Minor wording update in the documentation #nolog 2021-06-06 12:17:22 +01:00
Miguel Grinberg
8b4ebbd953 Run tests with pytest 2021-06-06 12:09:03 +01:00
Miguel Grinberg
a82ed55f56 Last version of the microdot-asyncio package 2021-06-06 11:54:51 +01:00
Miguel Grinberg
ac87f0542f Version 0.5.1.dev0 2021-06-06 11:49:01 +01:00
Miguel Grinberg
2de57498a8 Release 0.5.0 2021-06-06 11:47:09 +01:00
Miguel Grinberg
b7b881e3c7 merge microdot-asyncio package with microdot 2021-06-06 11:15:32 +01:00
Miguel Grinberg
9955ac99a6 change log 2021-06-06 00:38:46 +01:00
Miguel Grinberg
4b101d1597 Improve project structure 2021-06-06 00:29:52 +01:00
Miguel Grinberg
097cd9cf02 Documentation updates 2021-06-05 15:42:37 +01:00
Miguel Grinberg
b0c25a1a72 Support duplicate arguments in query string and form submissions
Fixes #21
2021-06-05 12:26:37 +01:00
Miguel Grinberg
b7b8e58d6a added documentation link 2021-06-05 00:56:16 +01:00
Miguel Grinberg
12cd60305b Documentation 2021-06-05 00:52:41 +01:00
49 changed files with 2819 additions and 758 deletions

View File

@@ -2,10 +2,10 @@ name: build
on:
push:
branches:
- master
- main
pull_request:
branches:
- master
- main
jobs:
lint:
name: lint
@@ -21,7 +21,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python: ['3.6', '3.7', '3.8', '3.9']
python: ['3.6', '3.7', '3.8', '3.9', '3.10']
fail-fast: false
runs-on: ${{ matrix.os }}
steps:

107
CHANGES.md Normal file
View File

@@ -0,0 +1,107 @@
# Microdot change log
**Release 0.9.0** - 2022-06-04
- Streaming responses [#44](https://github.com/miguelgrinberg/microdot/issues/44) ([commit](https://github.com/miguelgrinberg/microdot/commit/d71665fd388c92a50198faf0d761235f0138797a))
- Return 204 when view function returns None ([commit](https://github.com/miguelgrinberg/microdot/commit/71009b49781ce356155df661a66dc98170f35d63))
- ASGI support ([commit](https://github.com/miguelgrinberg/microdot/commit/7e8ecb199717dd90c6cb374cb0d24b54dd6ea33e))
- WSGI support ([commit](https://github.com/miguelgrinberg/microdot/commit/1ae51ccdf75991a2958b06f7a3439d64f92f1b69))
- Documentation updates ([commit](https://github.com/miguelgrinberg/microdot/commit/bcbad516751f1ea9928f4a6d0e8843a4334b885a))
- Add Python 3.10 to build ([commit](https://github.com/miguelgrinberg/microdot/commit/5b5eb907d83d94dde544b266e6659071e4d47ee1))
- Run linter on examples ([commit](https://github.com/miguelgrinberg/microdot/commit/c18ccccb8e0744d8670433aeeba068c5654f32df))
**Release 0.8.2** - 2022-04-20
- Remove debugging print statement [#38](https://github.com/miguelgrinberg/microdot/issues/38) ([commit](https://github.com/miguelgrinberg/microdot/commit/0f278321c8bd65c5cb67425eb837e6581cbb0054)) (thanks **Mark Blakeney**!)
**Release 0.8.1** - 2022-03-18
- Optimizations for request streams and bodies ([commit](https://github.com/miguelgrinberg/microdot/commit/29a9f6f46c737aa0fd452766c23bd83008594ac4))
**Release 0.8.0** - 2022-02-18
- Support streamed request payloads [#26](https://github.com/miguelgrinberg/microdot/issues/26) ([commit](https://github.com/miguelgrinberg/microdot/commit/992fa722c1312c0ac0ee9fbd5e23ad7b52d3caca))
- Use case insensitive comparisons for HTTP headers [#33](https://github.com/miguelgrinberg/microdot/issues/33) ([commit](https://github.com/miguelgrinberg/microdot/commit/e16fb94b2d1e88ef681d70f7f456c37ee9859df6)) (thanks **Steve Li**!)
- More robust logic to read request body [#31](https://github.com/miguelgrinberg/microdot/issues/31) ([commit](https://github.com/miguelgrinberg/microdot/commit/bd82c4deabf40d37e6b7397b08e8eb40ba2b6a42))
- Simplified `hello_async.py` example ([commit](https://github.com/miguelgrinberg/microdot/commit/c130d8f2d45dcce9606dda25d31d653ce91faf92))
**Release 0.7.2** - 2021-09-28
- Document a security risk in the send_file function ([commit](https://github.com/miguelgrinberg/microdot/commit/d29ed6aaa1f2080fcf471bf6ae0f480f95ff1716)) (thanks **Ky Tran**!)
- Validate redirect URLs ([commit](https://github.com/miguelgrinberg/microdot/commit/8e5fb92ff1ccd50972b0c1cb5a6c3bd5eb54d86b)) (thanks **Ky Tran**!)
- Return a 400 error when request object could not be created ([commit](https://github.com/miguelgrinberg/microdot/commit/06015934b834622d39f52b3e13d16bfee9dc8e5a))
**Release 0.7.1** - 2021-09-27
- Breaking change: Limit the size of each request line to 2KB. A different maximum can be set in `Request.max_readline`. ([commit](https://github.com/miguelgrinberg/microdot/commit/de9c991a9ab836d57d5c08bf4282f99f073b502a)) (thanks **Ky Tran**!)
**Release 0.7.0** - 2021-09-27
- Breaking change: Limit the size of the request body to 16KB. A different maximum can be set in `Request.max_content_length`. ([commit](https://github.com/miguelgrinberg/microdot/commit/5003a5b3d948a7cf365857b419bebf6e388593a1))
- Add documentation for `request.client_addr` [#27](https://github.com/miguelgrinberg/microdot/issues/27) ([commit](https://github.com/miguelgrinberg/microdot/commit/833fecb105ce456b95f1d2a6ea96dceca1075814)) (thanks **Mark Blakeney**!)
- Added documentation for reason argument in the Response object ([commit](https://github.com/miguelgrinberg/microdot/commit/d527bdb7c32ab918a1ecf6956cf3a9f544504354))
**Release 0.6.0** - 2021-08-11
- Better handling of content types in form and json methods [#24](https://github.com/miguelgrinberg/microdot/issues/24) ([commit](https://github.com/miguelgrinberg/microdot/commit/da32f23e35f871470a40638e7000e84b0ff6d17f))
- Accept a custom reason phrase for the HTTP response [#25](https://github.com/miguelgrinberg/microdot/issues/25) ([commit](https://github.com/miguelgrinberg/microdot/commit/bd74bcab74f283c89aadffc8f9c20d6ff0f771ce))
- Make mime type check for form submissions more robust ([commit](https://github.com/miguelgrinberg/microdot/commit/dd3fc20507715a23d0fa6fa3aae3715c8fbc0351))
- Copy client headers to avoid write back [#23](https://github.com/miguelgrinberg/microdot/issues/23) ([commit](https://github.com/miguelgrinberg/microdot/commit/0641466faa9dda0c54f78939ac05993c0812e84a)) (thanks **Mark Blakeney**!)
- Work around a bug in uasyncio's create_server() function ([commit](https://github.com/miguelgrinberg/microdot/commit/46963ba4644d7abc8dc653c99bc76222af526964))
- More unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/5cd3ace5166ec549579b0b1149ae3d7be195974a))
- Installation instructions ([commit](https://github.com/miguelgrinberg/microdot/commit/1a8db51cb3754308da6dcc227512dcdeb4ce4557))
- Run tests with pytest ([commit](https://github.com/miguelgrinberg/microdot/commit/8b4ebbd9535b3c083fb2a955284609acba07f05e))
- Deprecated the microdot-asyncio package ([commit](https://github.com/miguelgrinberg/microdot/commit/a82ed55f56e14fbcea93e8171af86ab42657fa96))
**Release 0.5.0** - 2021-06-06
- [Documentation](https://microdot.readthedocs.io/en/latest/) site ([commit](https://github.com/miguelgrinberg/microdot/commit/12cd60305b7b48ab151da52661fc5988684dbcd8))
- Support duplicate arguments in query string and form submissions [#21](https://github.com/miguelgrinberg/microdot/issues/21) ([commit](https://github.com/miguelgrinberg/microdot/commit/b0c25a1a7298189373be5df1668e0afb5532cdaf))
- Merge `microdot-asyncio` package with `microdot` ([commit](https://github.com/miguelgrinberg/microdot/commit/b7b881e3c7f1c6ede6546e498737e93928425c30))
- Added a change log ([commit](https://github.com/miguelgrinberg/microdot/commit/9955ac99a6ac20308644f02d6e6e32847d28b70c))
- Improve project structure ([commit](https://github.com/miguelgrinberg/microdot/commit/4b101d15971fa2883d187f0bab0be999ae30b583))
**Release v0.4.0** - 2021-06-04
- Add HTTP method-specific route decorators ([commit](https://github.com/miguelgrinberg/microdot/commit/a3288a63ed45f700f79b67d0b57fc4dd20e844c1))
- Server shutdown [#19](https://github.com/miguelgrinberg/microdot/issues/19) ([commit](https://github.com/miguelgrinberg/microdot/commit/0ad538df91f8b6b8a3885aa602c014ee7fe4526b))
- Update microypthon binary for tests to 1.15 ([commit](https://github.com/miguelgrinberg/microdot/commit/3bd7fe8cea4598a7dbd0efcb9c6ce57ec2b79f9c))
**Release v0.3.1** - 2021-02-06
- Support large downloads in send_file [#3](https://github.com/miguelgrinberg/microdot/issues/3) ([commit](https://github.com/miguelgrinberg/microdot/commit/3e29af57753dbb7961ff98719a4fc4f71c0b4e3e))
- Move socket import and add simple hello example [#12](https://github.com/miguelgrinberg/microdot/issues/12) ([commit](https://github.com/miguelgrinberg/microdot/commit/c5e1873523b609680ff67d7abfada72568272250)) (thanks **Damien George**!)
- Update python versions to build ([commit](https://github.com/miguelgrinberg/microdot/commit/dfbe2edd797153fc9be40bc1928d93bdee7e7be5))
- Handle Chrome preconnect [#8](https://github.com/miguelgrinberg/microdot/issues/8) ([commit](https://github.com/miguelgrinberg/microdot/commit/125af4b4a92b1d78acfa9d57ad2f507e759b6938)) (thanks **Ricardo Mendonça Ferreira**!)
- Readme update ([commit](https://github.com/miguelgrinberg/microdot/commit/1aacb3cf46bd0b634ec3bc852ff9439f3c5dd773))
- Switch to GitHub actions for builds ([commit](https://github.com/miguelgrinberg/microdot/commit/4c0afa2beca0c3b0f167fd25c6849d6937c412ba))
**Release v0.3.0** - 2019-05-05
- g, before_request and after_request ([commit](https://github.com/miguelgrinberg/microdot/commit/8aa50f171d2d04bc15c472ab1d9b3288518f7a21))
- Threaded mode ([commit](https://github.com/miguelgrinberg/microdot/commit/494800ff9ff474c38644979086057e3584573969))
- Optional asyncio support ([commit](https://github.com/miguelgrinberg/microdot/commit/3d9b5d7084d52e749553ca79206ed7060f963f9d))
- Debug mode ([commit](https://github.com/miguelgrinberg/microdot/commit/4c83cb75636572066958ef2cc0802909deafe542))
- Print exceptions ([commit](https://github.com/miguelgrinberg/microdot/commit/491202de1fce232b9629b7f1db63594fd13f84a3))
- Flake8 ([commit](https://github.com/miguelgrinberg/microdot/commit/92edc17522d7490544c7186d62a2964caf35c861))
- Unit testing framework ([commit](https://github.com/miguelgrinberg/microdot/commit/f741ed7cf83320d25ce16a1a29796af6fdfb91e9))
- More robust header checking in tests ([commit](https://github.com/miguelgrinberg/microdot/commit/03efe46a26e7074f960dd4c9a062c53d6f72bfa0))
- Response unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/cd71986a5042dcc308617a3db89476f28dd13ecf))
- Request unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/0b95feafc96dc91d7d34528ff2d8931a8aa3d612))
- More unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/76ab1fa6d72dd9deaa24aeaf4895a0c6fc883bcb))
- Async request and response unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/89f7f09b9a2d0dfccefabebbe9b83307133bd97c))
- More asyncio unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/ba986a89ff72ebbd9a65307b81ee769879961594))
- Improve code structure ([commit](https://github.com/miguelgrinberg/microdot/commit/b16466f1a9432a608eb23769907e8952fe304a9a))
- URL pattern matching unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/0a373775d54df571ceddaac090094bb62dbe6c72))
- Rename microdot_async to microdot_asyncio ([commit](https://github.com/miguelgrinberg/microdot/commit/e5525c5c485ae8901c9602da7e4582b58fb2da40))
**Release 0.2.0** - 2019-04-19
- Error handlers ([commit](https://github.com/miguelgrinberg/microdot/commit/0f2c749f6d1b9edbf124523160e10449c932ea45))
- Fleshed out example GPIO application ([commit](https://github.com/miguelgrinberg/microdot/commit/52f2d0c4918d00d1a7e46cc7fd9a909ef6d259c1))
- More robust parsing of cookie header ([commit](https://github.com/miguelgrinberg/microdot/commit/2f58c41cc89946d51646df83d4f9ae0e24e447b9))
**Release 0.1.1** - 2019-04-17
- Minor fixes for micropython ([commit](https://github.com/miguelgrinberg/microdot/commit/e4ff70cf8fe839f5b5297157bf028569188b9031))
- Initial commit ([commit](https://github.com/miguelgrinberg/microdot/commit/311a82a44430d427948866b09cb6136e60a5b1c9))

View File

@@ -1,8 +1,9 @@
# microdot
[![Build status](https://github.com/miguelgrinberg/microdot/workflows/build/badge.svg)](https://github.com/miguelgrinberg/microdot/actions) [![codecov](https://codecov.io/gh/miguelgrinberg/microdot/branch/master/graph/badge.svg)](https://codecov.io/gh/miguelgrinberg/microdot)
[![Build status](https://github.com/miguelgrinberg/microdot/workflows/build/badge.svg)](https://github.com/miguelgrinberg/microdot/actions) [![codecov](https://codecov.io/gh/miguelgrinberg/microdot/branch/main/graph/badge.svg)](https://codecov.io/gh/miguelgrinberg/microdot)
A minimalistic Python web framework for microcontrollers inspired by Flask
## Documentation
## Resources
Coming soon!
- [Documentation](https://microdot.readthedocs.io/en/latest/)
- [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md)

9
SECURITY.md Normal file
View File

@@ -0,0 +1,9 @@
# Security Policy
## Reporting a Vulnerability
If you think you've found a vulnerability on this project, please send me (Miguel Grinberg) an email at miguel.grinberg@gmail.com with a description of the problem. I will personally review the issue and respond to you with next steps.
If the issue is highly sensitive, you are welcome to encrypt your message. Here is my [PGP key](https://keyserver.ubuntu.com/pks/lookup?search=miguel.grinberg%40gmail.com&fingerprint=on&op=index).
Please do not disclose vulnerabilities publicly before discussing how to proceed with me.

View File

@@ -1,33 +0,0 @@
#!/bin/bash
VERSION=$1
if [[ "$VERSION" == "" ]]; then
echo Usage: $0 "<version>"
exit 1
fi
git diff --cached --exit-code >/dev/null
if [[ "$?" != "0" ]]; then
echo Commit your changes before using this script.
exit 1
fi
set -e
for PKG in microdot*; do
echo Building $PKG...
cd $PKG
sed -i "" "s/version.*$/version=\"$VERSION\",/" setup.py
git add setup.py
rm -rf dist
python setup.py sdist bdist_wheel --universal
cd ..
done
git commit -m "Release v$VERSION"
git tag v$VERSION
git push --tags origin master
for PKG in microdot*; do
echo Releasing $PKG...
cd $PKG
twine upload dist/*
cd ..
done

20
docs/Makefile Normal file
View File

@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

3
docs/_static/css/custom.css vendored Normal file
View File

@@ -0,0 +1,3 @@
.py .class, .py .method, .py .property {
margin-top: 20px;
}

90
docs/api.rst Normal file
View File

@@ -0,0 +1,90 @@
API Reference
=============
``microdot`` module
-------------------
The ``microdot`` module defines a few classes that help implement HTTP-based
servers for MicroPython and standard Python, with multithreading support for
Python interpreters that support it.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Microdot
:members:
``Request`` class
~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Request
:members:
``Response`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Response
:members:
``MultiDict`` class
~~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.MultiDict
:members:
``microdot_asyncio`` module
---------------------------
The ``microdot_asyncio`` module defines a few classes that help implement
HTTP-based servers for MicroPython and standard Python that use ``asyncio``
and coroutines.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Microdot
:inherited-members:
:members:
``Request`` class
~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Request
:inherited-members:
:members:
``Response`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Response
:inherited-members:
:members:
``microdot_wsgi`` module
------------------------
The ``microdot_wsgi`` module provides an extended ``Microdot`` class that
implements the WSGI protocol and can be used with a complaint WSGI web server
such as `Gunicorn <https://gunicorn.org/>`_ or
`uWSGI <https://uwsgi-docs.readthedocs.io/en/latest/>`_.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_wsgi.Microdot
:members:
:exclude-members: shutdown, run
``microdot_asgi`` module
------------------------
The ``microdot_asgi`` module provides an extended ``Microdot`` class that
implements the ASGI protocol and can be used with a complaint ASGI server such
as `Uvicorn <https://www.uvicorn.org/>`_.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asgi.Microdot
:members:
:exclude-members: shutdown, run

70
docs/conf.py Normal file
View File

@@ -0,0 +1,70 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../src'))
# -- Project information -----------------------------------------------------
project = 'Microdot'
copyright = '2021, Miguel Grinberg'
author = 'Miguel Grinberg'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_css_files = [
'css/custom.css',
]
html_theme_options = {
'github_user': 'miguelgrinberg',
'github_repo': 'microdot',
'github_banner': True,
'github_button': True,
'github_type': 'star',
'fixed_sidebar': True,
}
autodoc_default_options = {
'member-order': 'bysource',
}

21
docs/index.rst Normal file
View File

@@ -0,0 +1,21 @@
.. Microdot documentation master file, created by
sphinx-quickstart on Fri Jun 4 17:40:19 2021.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Microdot
========
Microdot is a minimalistic Python web framework inspired by
`Flask <https://flask.palletsprojects.com/>`_, and designed to run on
systems with limited resources such as microcontrollers. It runs on standard
Python and on `MicroPython <https://micropython.org>`_.
.. toctree::
:maxdepth: 3
intro
api
* :ref:`genindex`
* :ref:`search`

43
docs/intro.rst Normal file
View File

@@ -0,0 +1,43 @@
Installation
------------
Microdot can be installed with ``pip``::
pip install microdot
For MicroPython, you can install with ``upip``.
On platforms where ``pip`` or ``upip`` are not viable options, you can manually
copy and install the ``microdot.py`` and ``microdot_asyncio.py`` source files
from the `GitHub reposutory <https://github.com/miguelgrinberg/microdot>`_
into your project directory.
Examples
--------
The following is an example of a standard single or multi-threaded web
server::
from microdot import Microdot
app = Microdot()
@app.route('/')
def hello(request):
return 'Hello, world!'
app.run()
Microdot also supports the asynchronous model and can be used under
``asyncio``. The example that follows is equivalent to the one above, but uses
coroutines for concurrency::
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def hello(request):
return 'Hello, world!'
app.run()

35
docs/make.bat Normal file
View File

@@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

BIN
examples/1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 KiB

BIN
examples/2.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

BIN
examples/3.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

View File

@@ -1,4 +1,4 @@
from microdot import Microdot, Response
from microdot import Microdot
app = Microdot()
@@ -20,7 +20,7 @@ htmldoc = '''<!DOCTYPE html>
@app.route('/')
def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
return htmldoc, 200, {'Content-Type': 'text/html'}
@app.route('/shutdown')

36
examples/hello_asgi.py Normal file
View File

@@ -0,0 +1,36 @@
from microdot_asgi import Microdot
app = Microdot()
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
</head>
<body>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
'''
@app.route('/')
async def hello(request):
return htmldoc, 200, {'Content-Type': 'text/html'}
@app.route('/shutdown')
async def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
if __name__ == '__main__':
print('''Use an ASGI web server to run this applicaton.
Example:
uvicorn hello_asgi:app
''')

View File

@@ -1,8 +1,4 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot_asyncio import Microdot, Response
from microdot_asyncio import Microdot
app = Microdot()
@@ -24,7 +20,7 @@ htmldoc = '''<!DOCTYPE html>
@app.route('/')
async def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
return htmldoc, 200, {'Content-Type': 'text/html'}
@app.route('/shutdown')
@@ -33,8 +29,4 @@ async def shutdown(request):
return 'The server is shutting down...'
async def main():
await app.start_server(debug=True)
asyncio.run(main())
app.run(debug=True)

36
examples/hello_wsgi.py Normal file
View File

@@ -0,0 +1,36 @@
from microdot_wsgi import Microdot
app = Microdot()
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
</head>
<body>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
'''
@app.route('/')
def hello(request):
return htmldoc, 200, {'Content-Type': 'text/html'}
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
if __name__ == '__main__':
print('''Use a WSGI web server to run this applicaton.
Example:
gunicorn hello_wsgi:app
''')

45
examples/video_stream.py Normal file
View File

@@ -0,0 +1,45 @@
try:
import utime as time
except ImportError:
import time
from microdot import Microdot
app = Microdot()
frames = []
for file in ['1.jpg', '2.jpg', '3.jpg']:
with open(file, 'rb') as f:
frames.append(f.read())
@app.route('/')
def index(request):
return '''<!doctype html>
<html>
<head>
<title>Microdot Video Streaming</title>
</head>
<body>
<h1>Microdot Video Streaming</h1>
<img src="/video_feed">
</body>
</html>''', 200, {'Content-Type': 'text/html'}
@app.route('/video_feed')
def video_feed(request):
def stream():
yield b'--frame\r\n'
while True:
for frame in frames:
yield b'Content-Type: image/jpeg\r\n\r\n' + frame + \
b'\r\n--frame\r\n'
time.sleep(1)
return stream(), 200, {'Content-Type':
'multipart/x-mixed-replace; boundary=frame'}
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,64 @@
import sys
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot_asyncio import Microdot
app = Microdot()
frames = []
for file in ['1.jpg', '2.jpg', '3.jpg']:
with open(file, 'rb') as f:
frames.append(f.read())
@app.route('/')
def index(request):
return '''<!doctype html>
<html>
<head>
<title>Microdot Video Streaming</title>
</head>
<body>
<h1>Microdot Video Streaming</h1>
<img src="/video_feed">
</body>
</html>''', 200, {'Content-Type': 'text/html'}
@app.route('/video_feed')
async def video_feed(request):
if sys.implementation.name != 'micropython':
# CPython supports yielding async generators
async def stream():
yield b'--frame\r\n'
while True:
for frame in frames:
yield b'Content-Type: image/jpeg\r\n\r\n' + frame + \
b'\r\n--frame\r\n'
await asyncio.sleep(1)
else:
# MicroPython can only use class-based async generators
class stream():
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(1)
self.i = (self.i + 1) % len(frames)
return b'Content-Type: image/jpeg\r\n\r\n' + \
frames[self.i] + b'\r\n--frame\r\n'
return stream(), 200, {'Content-Type':
'multipart/x-mixed-replace; boundary=frame'}
if __name__ == '__main__':
app.run(debug=True)

5
legacy/README.md Normal file
View File

@@ -0,0 +1,5 @@
microdot-asyncio
================
This package has been merged with the ``microdot`` package. It currently
installs as an empty package that depends on it.

6
legacy/pyproject.toml Normal file
View File

@@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

24
legacy/setup.cfg Normal file
View File

@@ -0,0 +1,24 @@
[metadata]
name = microdot-asyncio
version = 0.5.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = AsyncIO support for the Microdot web framework'
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/miguelgrinberg/microdot
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/microdot/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Programming Language :: Python :: 3
Programming Language :: Python :: Implementation :: MicroPython
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
zip_safe = False
include_package_data = True
py_modules =
install_requires =
microdot

3
legacy/setup.py Normal file
View File

@@ -0,0 +1,3 @@
import setuptools
setuptools.setup()

View File

@@ -1,173 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class Request(BaseRequest):
@staticmethod
async def create(app, stream, client_addr):
# request line
line = (await stream.readline()).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await stream.readline()).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = await stream.read(content_length) \
if content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
class Response(BaseResponse):
async def write(self, stream):
self.complete()
# status code
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
await stream.awrite(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'):
self.body.close()
else:
await stream.awrite(self.body)
class Microdot(BaseMicrodot):
async def start_server(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.dispatch_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
self.server = await asyncio.start_server(serve, host, port)
await self.server.wait_closed()
def run(self, host='0.0.0.0', port=5000, debug=False):
asyncio.run(self.start_server(host=host, port=port, debug=debug))
def shutdown(self):
self.server.close()
async def dispatch_request(self, reader, writer):
req = await Request.create(self, reader,
writer.get_extra_info('peername'))
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
await res.write(writer)
await writer.aclose()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -1,35 +0,0 @@
"""
Microdot-AsyncIO
----------------
AsyncIO support for the Microdot web framework.
"""
from setuptools import setup
setup(
name='microdot-asyncio',
version="0.4.0",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='AsyncIO support for the Microdot web framework',
long_description=__doc__,
py_modules=['microdot_asyncio'],
platforms='any',
install_requires=[
'microdot',
'micropython-uasyncio;implementation_name=="micropython"'
],
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: Implementation :: MicroPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View File

@@ -1,452 +0,0 @@
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
try:
import uerrno as errno
except ImportError:
import errno
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
"""Use the threading module."""
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
"""Use MicroPython's _thread module."""
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
"""No threads available, call function synchronously."""
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class Request():
class G:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body):
self.app = app
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self.body = body
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(app, client_stream, client_addr):
# request line
line = client_stream.readline().strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = client_stream.read(content_length) if content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
def _parse_urlencoded(self, urlencoded):
return {
urldecode(key): urldecode(value) for key, value in [
pair.split('=', 1) for pair in
urlencoded.split('&')]}
@property
def json(self):
if self.content_type != 'application/json':
return None
if self._json is None:
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self.content_type != 'application/x-www-form-urlencoded':
return None
if self._form is None:
self._form = self._parse_urlencoded(self.body.decode())
return self._form
class Response():
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None):
self.status_code = status_code
self.headers = headers or {}
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes or file-like objects
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
stream.write(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no close
self.body.close()
else:
stream.write(self.body)
@classmethod
def redirect(cls, location, status_code=302):
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
self.server = None
def route(self, url_pattern, methods=None):
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def get(self, url_pattern):
return self.route(url_pattern, methods=['GET'])
def post(self, url_pattern):
return self.route(url_pattern, methods=['POST'])
def put(self, url_pattern):
return self.route(url_pattern, methods=['PUT'])
def patch(self, url_pattern):
return self.route(url_pattern, methods=['PATCH'])
def delete(self, url_pattern):
return self.route(url_pattern, methods=['DELETE'])
def before_request(self, f):
self.before_request_handlers.append(f)
return f
def after_request(self, f):
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
self.shutdown_requested = False
self.server = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(addr)
self.server.listen(5)
while not self.shutdown_requested:
try:
sock, addr = self.server.accept()
except OSError as exc:
if exc.args[0] == errno.ECONNABORTED:
break
else:
raise
create_thread(self.dispatch_request, sock, addr)
def shutdown(self):
self.shutdown_requested = True
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = Request.create(self, stream, addr)
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
if self.shutdown_requested: # pragma: no cover
self.server.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -1,31 +0,0 @@
"""
Microdot
--------
The impossibly small web framework for MicroPython.
"""
from setuptools import setup
setup(
name='microdot',
version="0.4.0",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='The impossibly small web framework for MicroPython',
long_description=__doc__,
py_modules=['microdot'],
platforms='any',
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: Implementation :: MicroPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

6
pyproject.toml Normal file
View File

@@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

View File

@@ -1,7 +1,6 @@
import sys
sys.path.insert(0, 'microdot')
sys.path.insert(1, 'microdot-asyncio')
sys.path.insert(0, 'src')
sys.path.insert(2, 'tests/libs')
import unittest

28
setup.cfg Normal file
View File

@@ -0,0 +1,28 @@
[metadata]
name = microdot
version = 0.9.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/miguelgrinberg/microdot
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/microdot/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Programming Language :: Python :: 3
Programming Language :: Python :: Implementation :: MicroPython
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
zip_safe = False
include_package_data = True
package_dir =
= src
py_modules =
microdot
microdot_asyncio
microdot_wsgi

3
setup.py Executable file
View File

@@ -0,0 +1,3 @@
import setuptools
setuptools.setup()

933
src/microdot.py Normal file
View File

@@ -0,0 +1,933 @@
"""
microdot
--------
The ``microdot`` module defines a few classes that help implement HTTP-based
servers for MicroPython and standard Python, with multithreading support for
Python interpreters that support it.
"""
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
try:
import uerrno as errno
except ImportError:
import errno
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
# use the threading module
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
# use MicroPython's _thread module
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
# no threads available, call function synchronously
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
form submissions.
:param initial_dict: an initial dictionary of key/value pairs to
initialize this object with.
Example::
>>> d = MultiDict()
>>> d['sort'] = 'name'
>>> d['sort'] = 'email'
>>> print(d['sort'])
'name'
>>> print(d.getlist('sort'))
['name', 'email']
"""
def __init__(self, initial_dict=None):
super().__init__()
if initial_dict:
for key, value in initial_dict.items():
self[key] = value
def __setitem__(self, key, value):
if key not in self:
super().__setitem__(key, [])
super().__getitem__(key).append(value)
def __getitem__(self, key):
return super().__getitem__(key)[0]
def get(self, key, default=None, type=None):
"""Return the value for a given key.
:param key: The key to retrieve.
:param default: A default value to use if the key does not exist.
:param type: A type conversion callable to apply to the value.
If the multidict contains more than one value for the requested key,
this method returns the first value only.
Example::
>>> d = MultiDict()
>>> d['age'] = '42'
>>> d.get('age')
'42'
>>> d.get('age', type=int)
42
>>> d.get('name', default='noname')
'noname'
"""
if key not in self:
return default
value = self[key]
if type is not None:
value = type(value)
return value
def getlist(self, key, type=None):
"""Return all the values for a given key.
:param key: The key to retrieve.
:param type: A type conversion callable to apply to the values.
If the requested key does not exist in the dictionary, this method
returns an empty list.
Example::
>>> d = MultiDict()
>>> d.getlist('items')
[]
>>> d['items'] = '3'
>>> d.getlist('items')
['3']
>>> d['items'] = '56'
>>> d.getlist('items')
['3', '56']
>>> d.getlist('items', type=int)
[3, 56]
"""
if key not in self:
return []
values = super().__getitem__(key)
if type is not None:
values = [type(value) for value in values]
return values
class Request():
"""An HTTP request class.
:var app: The application instance to which this request belongs.
:var client_addr: The address of the client, as a tuple (host, port).
:var method: The HTTP method of the request.
:var path: The path portion of the URL.
:var query_string: The query string portion of the URL.
:var args: The parsed query string, as a :class:`MultiDict` object.
:var headers: A dictionary with the headers included in the request.
:var cookies: A dictionary with the cookies included in the request.
:var content_length: The parsed ``Content-Length`` header.
:var content_type: The parsed ``Content-Type`` header.
:var stream: The input stream, containing the request body.
:var body: The body of the request, as bytes.
:var json: The parsed JSON body, as a dictionary or list, or ``None`` if
the request does not have a JSON body.
:var form: The parsed form submission body, as a :class:`MultiDict` object,
or ``None`` if the request does not have a form submission.
:var g: A general purpose container for applications to store data during
the life of the request.
"""
#: Specify the maximum payload size that is accepted. Requests with larger
#: payloads will be rejected with a 413 status code. Applications can
#: change this maximum as necessary.
#:
#: Example::
#:
#: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed
max_content_length = 16 * 1024
#: Specify the maximum payload size that can be stored in ``body``.
#: Requests with payloads that are larger than this size and up to
#: ``max_content_length`` bytes will be accepted, but the application will
#: only be able to access the body of the request by reading from
#: ``stream``. Set to 0 if you always access the body as a stream.
#:
#: Example::
#:
#: Request.max_body_length = 4 * 1024 # up to 4KB bodies read
max_body_length = 16 * 1024
#: Specify the maximum length allowed for a line in the request. Requests
#: with longer lines will not be correctly interpreted. Applications can
#: change this maximum as necessary.
#:
#: Example::
#:
#: Request.max_readline = 16 * 1024 # 16KB lines allowed
max_readline = 2 * 1024
class G:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body=None, stream=None):
self.app = app
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
header = header.lower()
if header == 'content-length':
self.content_length = int(value)
elif header == 'content-type':
self.content_type = value
elif header == 'cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self._body = body
self.body_used = False
self._stream = stream
self.stream_used = False
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method returns a newly created ``Request`` object.
"""
# request line
line = Request._safe_readline(client_stream).strip().decode()
if not line:
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
while True:
line = Request._safe_readline(client_stream).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
return Request(app, client_addr, method, url, http_version, headers,
stream=client_stream)
def _parse_urlencoded(self, urlencoded):
data = MultiDict()
for k, v in [pair.split('=', 1) for pair in urlencoded.split('&')]:
data[urldecode(k)] = urldecode(v)
return data
@property
def body(self):
if self.stream_used:
raise RuntimeError('Cannot use both stream and body')
if self._body is None:
self._body = b''
if self.content_length and \
self.content_length <= Request.max_body_length:
while len(self._body) < self.content_length:
data = self._stream.read(
self.content_length - len(self._body))
if len(data) == 0: # pragma: no cover
raise EOFError()
self._body += data
self.body_used = True
return self._body
@property
def stream(self):
if self.body_used:
raise RuntimeError('Cannot use both stream and body')
self.stream_used = True
return self._stream
@property
def json(self):
if self._json is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/json':
return None
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self._form is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/x-www-form-urlencoded':
return None
self._form = self._parse_urlencoded(self.body.decode())
return self._form
@staticmethod
def _safe_readline(stream):
line = stream.readline(Request.max_readline + 1)
if len(line) > Request.max_readline:
raise ValueError('line too long')
return line
class Response():
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body. If a file-like
object or a generator is given, a streaming response is used.
If a string is given, it is encoded from UTF-8. Else, the
body should be a byte sequence.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None, reason=None):
if body is None and status_code == 200:
body = ''
status_code = 204
self.status_code = status_code
self.headers = headers.copy() if headers else {}
self.reason = reason
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes, file-like objects or generators
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
"""Add a cookie to the response.
:param cookie: The cookie's name.
:param value: The cookie's value.
:param path: The cookie's path.
:param domain: The cookie's domain.
:param expires: The cookie expiration time, as a ``datetime`` object.
:param max_age: The cookie's ``Max-Age`` value.
:param secure: The cookie's ``secure`` flag.
:param http_only: The cookie's ``HttpOnly`` flag.
"""
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
can_flush = hasattr(stream, 'flush')
try:
for body in self.body_iter():
if isinstance(body, str):
body = body.encode()
stream.write(body)
if can_flush: # pragma: no cover
stream.flush()
except OSError as exc: # pragma: no cover
if exc.errno == 32: # errno.EPIPE
pass
else:
raise
def body_iter(self):
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
yield buf
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no cover
self.body.close()
elif hasattr(self.body, '__next__'):
yield from self.body
else:
yield self.body
@classmethod
def redirect(cls, location, status_code=302):
"""Return a redirect response.
:param location: The URL to redirect to.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
"""
if '\x0d' in location or '\x0a' in location:
raise ValueError('invalid redirect URL')
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
"""Send file contents in a response.
:param filename: The filename of the file.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
:param content_type: The ``Content-Type`` header to use in the
response. If omitted, it is generated
automatically from the file extension.
Security note: The filename is assumed to be trusted. Never pass
filenames provided by the user before validating and sanitizing them
first.
"""
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
"""An HTTP application class.
This class implements an HTTP application instance and is heavily
influenced by the ``Flask`` class of the Flask framework. It is typically
declared near the start of the main application script.
Example::
from microdot import Microdot
app = Microdot()
"""
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
self.server = None
def route(self, url_pattern, methods=None):
"""Decorator that is used to register a function as a request handler
for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
:param methods: The list of HTTP methods to be handled by the
decorated function. If omitted, only ``GET`` requests
are handled.
The URL pattern can be a static path (for example, ``/users`` or
``/api/invoices/search``) or a path with dynamic components enclosed
in ``<`` and ``>`` (for example, ``/users/<id>`` or
``/invoices/<number>/products``). Dynamic path components can also
include a type prefix, separated from the name with a colon (for
example, ``/users/<int:id>``). The type can be ``string`` (the
default), ``int``, ``path`` or ``re:[regular-expression]``.
The first argument of the decorated function must be
the request object. Any path arguments that are specified in the URL
pattern are passed as keyword arguments. The return value of the
function must be a :class:`Response` instance, or the arguments to
be passed to this class.
Example::
@app.route('/')
def index(request):
return 'Hello, world!'
"""
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def get(self, url_pattern):
"""Decorator that is used to register a function as a ``GET`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['GET']``.
Example::
@app.get('/users/<int:id>')
def get_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['GET'])
def post(self, url_pattern):
"""Decorator that is used to register a function as a ``POST`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the``route`` decorator with
``methods=['POST']``.
Example::
@app.post('/users')
def create_user(request):
# ...
"""
return self.route(url_pattern, methods=['POST'])
def put(self, url_pattern):
"""Decorator that is used to register a function as a ``PUT`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PUT']``.
Example::
@app.put('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PUT'])
def patch(self, url_pattern):
"""Decorator that is used to register a function as a ``PATCH`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PATCH']``.
Example::
@app.patch('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PATCH'])
def delete(self, url_pattern):
"""Decorator that is used to register a function as a ``DELETE``
request handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['DELETE']``.
Example::
@app.delete('/users/<int:id>')
def delete_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['DELETE'])
def before_request(self, f):
"""Decorator to register a function to run before each request is
handled. The decorated function must take a single argument, the
request object.
Example::
@app.before_request
def func(request):
# ...
"""
self.before_request_handlers.append(f)
return f
def after_request(self, f):
"""Decorator to register a function to run after each request is
handled. The decorated function must take two arguments, the request
and response objects. The return value of the function must be an
updated response object.
Example::
@app.before_request
def func(request, response):
# ...
"""
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
"""Decorator to register a function as an error handler. Error handler
functions for numeric HTTP status codes must accept a single argument,
the request object. Error handler functions for Python exceptions
must accept two arguments, the request object and the exception
object.
:param status_code_or_exception_class: The numeric HTTP status code or
Python exception class to
handle.
Examples::
@app.errorhandler(404)
def not_found(request):
return 'Not found'
@app.errorhandler(RuntimeError)
def runtime_error(request, exception):
return 'Runtime error'
"""
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot import Microdot
app = Microdot()
@app.route('/')
def index():
return 'Hello, world!'
app.run(debug=True)
"""
self.debug = debug
self.shutdown_requested = False
self.server = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(addr)
self.server.listen(5)
while not self.shutdown_requested:
try:
sock, addr = self.server.accept()
except OSError as exc: # pragma: no cover
if exc.errno == errno.ECONNABORTED:
break
else:
raise
create_thread(self.handle_request, sock, addr)
def shutdown(self):
"""Request a server shutdown. The server will then exit its request
listening loop and the :func:`run` function will return. This function
can be safely called from a route handler, as it only schedules the
server to terminate as soon as the request completes.
Example::
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
"""
self.shutdown_requested = True
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def handle_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = None
try:
req = Request.create(self, stream, addr)
except Exception as exc: # pragma: no cover
print_exception(exc)
res = self.dispatch_request(req)
res.write(stream)
try:
stream.close()
except OSError as exc: # pragma: no cover
if exc.errno == 32: # errno.EPIPE
pass
else:
raise
if stream != sock: # pragma: no cover
sock.close()
if self.shutdown_requested: # pragma: no cover
self.server.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
def dispatch_request(self, req):
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = self.error_handlers[413](req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
else:
res = 'Bad request', 400
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
return res
redirect = Response.redirect
send_file = Response.send_file

141
src/microdot_asgi.py Normal file
View File

@@ -0,0 +1,141 @@
import asyncio
import os
import signal
from microdot_asyncio import * # noqa: F401, F403
from microdot_asyncio import Microdot as BaseMicrodot
from microdot_asyncio import Request
class _BodyStream: # pragma: no cover
def __init__(self, receive):
self.receive = receive
self.data = b''
self.more = True
async def read_more(self):
if self.more:
packet = await self.receive()
self.data += packet.get('body', b'')
self.more = packet.get('more_body', False)
async def read(self, n=-1):
while self.more and len(self.data) < n:
self.read_more()
if len(self.data) < n:
data = self.data
self.data = b''
return data
data = self.data[:n]
self.data = self.data[n:]
return data
async def readline(self):
return self.readuntil()
async def readexactly(self, n):
return self.read(n)
async def readuntil(self, separator=b'\n'):
if self.more and separator not in self.data:
self.read_more()
data, self.data = self.data.split(separator, 1)
return data
class Microdot(BaseMicrodot):
def __init__(self):
super().__init__()
self.embedded_server = False
async def asgi_app(self, scope, receive, send):
"""An ASGI application."""
if scope['type'] != 'http': # pragma: no cover
return
path = scope['path']
if 'query_string' in scope and scope['query_string']:
path += '?' + scope['query_string'].decode()
headers = {}
content_length = 0
for key, value in scope.get('headers', []):
headers[key] = value
if key.lower() == 'content-length':
content_length = int(value)
body = b''
if content_length and content_length <= Request.max_body_length:
body = b''
more = True
while more:
packet = await receive()
body += packet.get('body', b'')
more = packet.get('more_body', False)
stream = None
else:
body = b''
stream = _BodyStream(receive)
req = Request(
self,
(scope['client'][0], scope['client'][1]),
scope['method'],
path,
'HTTP/' + scope['http_version'],
headers,
body=body,
stream=stream)
req.asgi_scope = scope
res = await self.dispatch_request(req)
res.complete()
await send({'type': 'http.response.start',
'status': res.status_code,
'headers': [(name, value)
for name, value in res.headers.items()]})
cancelled = False
async def cancel_monitor():
nonlocal cancelled
while True:
event = await receive()
if event['type'] == 'http.disconnect': # pragma: no branch
cancelled = True
break
asyncio.ensure_future(cancel_monitor())
body_iter = res.body_iter().__aiter__()
try:
body = await body_iter.__anext__()
while not cancelled: # pragma: no branch
next_body = await body_iter.__anext__()
await send({'type': 'http.response.body',
'body': body,
'more_body': True})
body = next_body
except StopAsyncIteration:
await send({'type': 'http.response.body',
'body': body,
'more_body': False})
async def __call__(self, scope, receive, send):
return await self.asgi_app(scope, receive, send)
def shutdown(self):
if self.embedded_server: # pragma: no cover
super().shutdown()
else:
pid = os.getpgrp() if hasattr(os, 'getpgrp') else os.getpid()
os.kill(pid, signal.SIGTERM)
def run(self, host='0.0.0.0', port=5000, debug=False,
**options): # pragma: no cover
"""Normally you would not start the server by invoking this method.
Instead, start your chosen ASGI web server and pass the ``Microdot``
instance as the ASGI application.
"""
self.embedded_server = True
super().run(host=host, port=port, debug=debug, **options)

376
src/microdot_asyncio.py Normal file
View File

@@ -0,0 +1,376 @@
"""
microdot_asyncio
----------------
The ``microdot_asyncio`` module defines a few classes that help implement
HTTP-based servers for MicroPython and standard Python that use ``asyncio``
and coroutines.
"""
try:
import uasyncio as asyncio
except ImportError:
import asyncio
try:
import uio as io
except ImportError:
import io
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class _AsyncBytesIO:
def __init__(self, data):
self.stream = io.BytesIO(data)
async def read(self, n=-1):
return self.stream.read(n)
async def readline(self): # pragma: no cover
return self.stream.readline()
async def readexactly(self, n): # pragma: no cover
return self.stream.read(n)
async def readuntil(self, separator=b'\n'): # pragma: no cover
return self.stream.readuntil(separator=separator)
class Request(BaseRequest):
@staticmethod
async def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method is a coroutine. It returns a newly created ``Request``
object.
"""
# request line
line = (await Request._safe_readline(client_stream)).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await Request._safe_readline(
client_stream)).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header.lower() == 'content-length':
content_length = int(value)
# body
body = b''
if content_length and content_length <= Request.max_body_length:
body = await client_stream.readexactly(content_length)
stream = None
else:
body = b''
stream = client_stream
return Request(app, client_addr, method, url, http_version, headers,
body=body, stream=stream)
@property
def stream(self):
if self._stream is None:
self._stream = _AsyncBytesIO(self._body)
return self._stream
@staticmethod
async def _safe_readline(stream):
line = (await stream.readline())
if len(line) > Request.max_readline:
raise ValueError('line too long')
return line
class Response(BaseResponse):
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body. If a file-like
object or an async generator is given, a streaming response is
used. If a string is given, it is encoded from UTF-8. Else,
the body should be a byte sequence.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
async def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
try:
async for body in self.body_iter():
if isinstance(body, str):
body = body.encode()
await stream.awrite(body)
except OSError as exc: # pragma: no cover
if exc.errno == 32 or exc.args[0] == 'Connection lost':
pass
else:
raise
def body_iter(self):
if hasattr(self.body, '__anext__'):
return self.body
response = self
class iter:
def __aiter__(self):
if response.body:
self.i = 0
else:
self.i = -1
return self
async def __anext__(self):
if self.i == -1:
raise StopAsyncIteration
if self.i == 0:
if not hasattr(response.body, 'read'):
self.i = -1
return response.body
else:
self.i = 1
buf = response.body.read(response.send_file_buffer_size)
if _iscoroutine(buf): # pragma: no cover
buf = await buf
if len(buf) < response.send_file_buffer_size:
self.i = -1
if hasattr(response.body, 'close'): # pragma: no cover
result = response.body.close()
if _iscoroutine(result):
await result
return buf
return iter()
class Microdot(BaseMicrodot):
async def start_server(self, host='0.0.0.0', port=5000, debug=False):
"""Start the Microdot web server as a coroutine. This coroutine does
not normally return, as the server enters an endless listening loop.
The :func:`shutdown` function provides a method for terminating the
server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
This method is a coroutine.
Example::
import asyncio
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
async def main():
await app.start_server(debug=True)
asyncio.run(main())
"""
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.handle_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
self.server = await asyncio.start_server(serve, host, port)
while True:
try:
await self.server.wait_closed()
break
except AttributeError: # pragma: no cover
# the task hasn't been initialized in the server object yet
# wait a bit and try again
await asyncio.sleep(0.1)
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
app.run(debug=True)
"""
asyncio.run(self.start_server(host=host, port=port, debug=debug))
def shutdown(self):
self.server.close()
async def handle_request(self, reader, writer):
req = None
try:
req = await Request.create(self, reader,
writer.get_extra_info('peername'))
except Exception as exc: # pragma: no cover
print_exception(exc)
res = await self.dispatch_request(req)
await res.write(writer)
try:
await writer.aclose()
except OSError as exc: # pragma: no cover
if exc.errno == 32: # errno.EPIPE
pass
else:
raise
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def dispatch_request(self, req):
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[413], req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
else:
res = 'Bad request', 400
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
return res
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

59
src/microdot_wsgi.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import signal
from microdot import * # noqa: F401, F403
from microdot import Microdot as BaseMicrodot
from microdot import Request
class Microdot(BaseMicrodot):
def __init__(self):
super().__init__()
self.embedded_server = False
def wsgi_app(self, environ, start_response):
"""A WSGI application callable."""
path = environ.get('SCRIPT_NAME', '') + environ.get('PATH_INFO', '')
if 'QUERY_STRING' in environ and environ['QUERY_STRING']:
path += '?' + environ['QUERY_STRING']
headers = {}
for k, v in environ.items():
if k.startswith('HTTP_'):
h = '-'.join([p.title() for p in k[5:].split('_')])
headers[h] = v
req = Request(
self,
(environ['REMOTE_ADDR'], int(environ.get('REMOTE_PORT', '0'))),
environ['REQUEST_METHOD'],
path,
environ['SERVER_PROTOCOL'],
headers,
stream=environ['wsgi.input'])
req.environ = environ
res = self.dispatch_request(req)
res.complete()
reason = res.reason or ('OK' if res.status_code == 200 else 'N/A')
start_response(
str(res.status_code) + ' ' + reason,
[(name, value) for name, value in res.headers.items()])
return res.body_iter()
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def shutdown(self):
if self.embedded_server: # pragma: no cover
super().shutdown()
else:
pid = os.getpgrp() if hasattr(os, 'getpgrp') else os.getpid()
os.kill(pid, signal.SIGTERM)
def run(self, host='0.0.0.0', port=5000, debug=False,
**options): # pragma: no cover
"""Normally you would not start the server by invoking this method.
Instead, start your chosen WSGI web server and pass the ``Microdot``
instance as the WSGI callable.
"""
self.embedded_server = True
super().run(host=host, port=port, debug=debug, **options)

View File

@@ -1,3 +1,4 @@
from tests.microdot.test_multidict import TestMultiDict
from tests.microdot.test_request import TestRequest
from tests.microdot.test_response import TestResponse
from tests.microdot.test_url_pattern import TestURLPattern

View File

@@ -65,6 +65,52 @@ class TestMicrodot(unittest.TestCase):
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
def test_empty_request(self):
app = Microdot()
mock_socket.clear_requests()
fd = mock_socket.FakeStream(b'\n')
mock_socket._requests.append(fd)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 11\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nBad request'))
def test_method_decorators(self):
app = Microdot()
@app.get('/get')
def get(req):
return 'GET'
@app.post('/post')
def post(req):
return 'POST'
@app.put('/put')
def put(req):
return 'PUT'
@app.patch('/patch')
def patch(req):
return 'PATCH'
@app.delete('/delete')
def delete(req):
return 'DELETE'
methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
mock_socket.clear_requests()
fds = [mock_socket.add_request(method, '/' + method.lower())
for method in methods]
self._add_shutdown(app)
app.run()
for fd, method in zip(fds, methods):
self.assertTrue(fd.response.endswith(
b'\r\n\r\n' + method.encode()))
def test_before_after_request(self):
app = Microdot()
@@ -149,6 +195,42 @@ class TestMicrodot(unittest.TestCase):
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_413(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 413 N/A\r\n'))
self.assertIn(b'Content-Length: 17\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nPayload too large'))
def test_413_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(413)
def handle_413(req):
return '413', 400
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n413'))
def test_500(self):
app = Microdot()
@@ -204,3 +286,21 @@ class TestMicrodot(unittest.TestCase):
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
def test_streaming(self):
app = Microdot()
@app.route('/')
def index(req):
def stream():
yield 'foo'
yield b'bar'
return stream()
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoobar'))

View File

@@ -0,0 +1,31 @@
import unittest
from microdot import MultiDict
class TestMultiDict(unittest.TestCase):
def test_multidict(self):
d = MultiDict()
assert dict(d) == {}
assert d.get('zero') is None
assert d.get('zero', default=0) == 0
assert d.getlist('zero') == []
assert d.getlist('zero', type=int) == []
d['one'] = 1
assert d['one'] == 1
assert d.get('one') == 1
assert d.get('one', default=2) == 1
assert d.get('one', type=int) == 1
assert d.get('one', type=str) == '1'
d['two'] = 1
d['two'] = 2
assert d['two'] == 1
assert d.get('two') == 1
assert d.get('two', default=2) == 1
assert d.get('two', type=int) == 1
assert d.get('two', type=str) == '1'
assert d.getlist('two') == [1, 2]
assert d.getlist('two', type=int) == [1, 2]
assert d.getlist('two', type=str) == ['1', '2']

View File

@@ -1,5 +1,5 @@
import unittest
from microdot import Request
from microdot import Request, MultiDict
from tests.mock_socket import get_request_fd
@@ -42,7 +42,8 @@ class TestRequest(unittest.TestCase):
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_json(self):
fd = get_request_fd('GET', '/foo', headers={
@@ -68,7 +69,8 @@ class TestRequest(unittest.TestCase):
body='foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(form, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
self.assertTrue(req.form is form)
fd = get_request_fd('GET', '/foo', headers={
@@ -76,3 +78,51 @@ class TestRequest(unittest.TestCase):
body='foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
self.assertIsNone(req.form)
def test_large_line(self):
saved_max_readline = Request.max_readline
Request.max_readline = 16
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=y')
with self.assertRaises(ValueError):
Request.create('app', fd, 'addr')
Request.max_readline = saved_max_readline
def test_stream(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': '19'},
body='foo=bar&abc=def&x=y')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.stream.read(), b'foo=bar&abc=def&x=y')
with self.assertRaises(RuntimeError):
req.body
def test_body(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': '19'},
body='foo=bar&abc=def&x=y')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.body, b'foo=bar&abc=def&x=y')
with self.assertRaises(RuntimeError):
req.stream
def test_large_payload(self):
saved_max_content_length = Request.max_content_length
saved_max_body_length = Request.max_body_length
Request.max_content_length = 32
Request.max_body_length = 16
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=y')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.body, b'')
self.assertEqual(req.stream.read(), b'foo=bar&abc=def&x=y')
Request.max_content_length = saved_max_content_length
Request.max_body_length = saved_max_body_length

View File

@@ -88,6 +88,18 @@ class TestResponse(unittest.TestCase):
self.assertIn(b'Content-Type: application/json\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\n[1, "2"]'))
def test_create_from_none(self):
res = Response(None)
self.assertEqual(res.status_code, 204)
self.assertEqual(res.body, b'')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 204 N/A\r\n', response)
self.assertIn(b'Content-Length: 0\r\n', response)
self.assertIn(b'Content-Type: text/plain\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\n'))
def test_create_from_other(self):
res = Response(123)
self.assertEqual(res.status_code, 200)
@@ -112,6 +124,28 @@ class TestResponse(unittest.TestCase):
self.assertEqual(res.headers, {'X-Test': 'Foo'})
self.assertEqual(res.body, b'foo')
def test_create_with_reason(self):
res = Response('foo', reason='ALL GOOD!')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'ALL GOOD!')
self.assertEqual(res.body, b'foo')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 ALL GOOD!\r\n', response)
def test_create_with_status_and_reason(self):
res = Response('not found', 404, reason='NOT FOUND')
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'NOT FOUND')
self.assertEqual(res.body, b'not found')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 404 NOT FOUND\r\n', response)
def test_cookies(self):
res = Response('ok')
res.set_cookie('foo1', 'bar1')
@@ -145,6 +179,9 @@ class TestResponse(unittest.TestCase):
self.assertEqual(res.status_code, 301)
self.assertEqual(res.headers['Location'], '/foo')
with self.assertRaises(ValueError):
Response.redirect('/foo\x0d\x0a\x0d\x0a<p>Foo</p>')
def test_send_file(self):
files = [
('test.txt', 'text/plain'),

View File

@@ -0,0 +1,161 @@
import unittest
import sys
try:
import asyncio
except ImportError:
pass
try:
from unittest import mock
except ImportError:
mock = None
from microdot_asgi import Microdot, Response
from tests import mock_asyncio
@unittest.skipIf(sys.implementation.name == 'micropython',
'not supported under MicroPython')
class TestMicrodotASGI(unittest.TestCase):
def test_asgi_request_with_query_string(self):
app = Microdot()
@app.post('/foo/bar')
async def index(req):
self.assertEqual(req.app, app)
self.assertEqual(req.client_addr, ('1.2.3.4', 1234))
self.assertEqual(req.method, 'POST')
self.assertEqual(req.http_version, 'HTTP/1.1')
self.assertEqual(req.path, '/foo/bar')
self.assertEqual(req.args, {'baz': ['1']})
self.assertEqual(req.cookies, {'session': 'xyz'})
self.assertEqual(req.body, b'body')
class R:
def __init__(self):
self.i = 0
self.body = [b're', b'sp', b'on', b'se', b'']
async def read(self, n):
data = self.body[self.i]
self.i += 1
return data
return Response(body=R(), headers={'Content-Length': '8'})
scope = {
'type': 'http',
'path': '/foo/bar',
'query_string': b'baz=1',
'headers': [('Authorization', 'Bearer 123'),
('Cookie', 'session=xyz'),
('Content-Length', 4)],
'client': ['1.2.3.4', 1234],
'method': 'POST',
'http_version': '1.1',
}
event_index = 0
async def receive():
nonlocal event_index
if event_index == 0:
event_index = 1
return {
'type': 'http.request',
'body': b'body',
'more_body': False,
}
await asyncio.sleep(0.1)
return {
'type': 'http.disconnect',
}
async def send(packet):
if packet['type'] == 'http.response.start':
self.assertEqual(packet['status'], 200)
self.assertEqual(
packet['headers'],
[('Content-Length', '8'), ('Content-Type', 'text/plain')])
elif packet['type'] == 'http.response.body':
self.assertIn(packet['body'],
[b're', b'sp', b'on', b'se', b''])
original_buffer_size = Response.send_file_buffer_size
Response.send_file_buffer_size = 2
mock_asyncio.run(app(scope, receive, send))
Response.send_file_buffer_size = original_buffer_size
def test_wsgi_request_without_query_string(self):
app = Microdot()
@app.route('/foo/bar')
async def index(req):
self.assertEqual(req.path, '/foo/bar')
self.assertEqual(req.args, {})
return 'response'
scope = {
'type': 'http',
'path': '/foo/bar',
'headers': [('Authorization', 'Bearer 123'),
('Cookie', 'session=xyz'),
('Content-Length', 4)],
'client': ['1.2.3.4', 1234],
'method': 'POST',
'http_version': '1.1',
}
event_index = 0
async def receive():
nonlocal event_index
if event_index == 0:
event_index = 1
return {
'type': 'http.request',
'body': b'body',
'more_body': False,
}
await asyncio.sleep(0.1)
return {
'type': 'http.disconnect',
}
async def send(packet):
pass
mock_asyncio.run(app(scope, receive, send))
def test_shutdown(self):
app = Microdot()
@app.route('/shutdown')
async def shutdown(request):
request.app.shutdown()
scope = {
'type': 'http',
'path': '/shutdown',
'client': ['1.2.3.4', 1234],
'method': 'GET',
'http_version': '1.1',
}
async def receive():
pass
async def send(packet):
pass
with mock.patch('microdot_asgi.os.kill') as kill:
mock_asyncio.run(app(scope, receive, send))
kill.assert_called()

View File

@@ -76,6 +76,19 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
self.assertTrue(fd2.response.endswith(b'\r\n\r\nbar-async'))
def test_empty_request(self):
app = Microdot()
mock_socket.clear_requests()
fd = mock_socket.FakeStream(b'\n')
mock_socket._requests.append(fd)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 11\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nBad request'))
def test_before_after_request(self):
app = Microdot()
@@ -160,6 +173,42 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_413(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 413 N/A\r\n'))
self.assertIn(b'Content-Length: 17\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nPayload too large'))
def test_413_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(413)
async def handle_413(req):
return '413', 400
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n413'))
def test_500(self):
app = Microdot()
@@ -215,3 +264,33 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
def test_streaming(self):
app = Microdot()
@app.route('/')
def index(req):
class stream():
def __init__(self):
self.i = 0
self.data = ['foo', b'bar']
def __aiter__(self):
return self
async def __anext__(self):
if self.i >= len(self.data):
raise StopAsyncIteration
data = self.data[self.i]
self.i += 1
return data
return stream()
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoobar'))

View File

@@ -4,6 +4,7 @@ except ImportError:
import asyncio
import unittest
from microdot import MultiDict
from microdot_asyncio import Request
from tests.mock_socket import get_async_request_fd
@@ -51,7 +52,8 @@ class TestRequestAsync(unittest.TestCase):
fd = get_async_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_json(self):
fd = get_async_request_fd('GET', '/foo', headers={
@@ -77,7 +79,8 @@ class TestRequestAsync(unittest.TestCase):
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(form, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
self.assertTrue(req.form is form)
fd = get_async_request_fd('GET', '/foo', headers={
@@ -85,3 +88,43 @@ class TestRequestAsync(unittest.TestCase):
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
self.assertIsNone(req.form)
def test_large_line(self):
saved_max_readline = Request.max_readline
Request.max_readline = 16
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=y')
with self.assertRaises(ValueError):
_run(Request.create('app', fd, 'addr'))
Request.max_readline = saved_max_readline
def test_stream(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': '19'},
body='foo=bar&abc=def&x=y')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.body, b'foo=bar&abc=def&x=y')
data = _run(req.stream.read())
self.assertEqual(data, b'foo=bar&abc=def&x=y')
def test_large_payload(self):
saved_max_content_length = Request.max_content_length
saved_max_body_length = Request.max_body_length
Request.max_content_length = 32
Request.max_body_length = 16
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': '19'},
body='foo=bar&abc=def&x=y')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.body, b'')
data = _run(req.stream.read())
self.assertEqual(data, b'foo=bar&abc=def&x=y')
Request.max_content_length = saved_max_content_length
Request.max_body_length = saved_max_body_length

View File

@@ -85,6 +85,26 @@ class TestResponseAsync(unittest.TestCase):
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n[1, "2"]'))
def test_create_with_reason(self):
res = Response('foo', reason='ALL GOOD!')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'ALL GOOD!')
self.assertEqual(res.body, b'foo')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 ALL GOOD!\r\n', fd.response)
def test_create_with_status_and_reason(self):
res = Response('not found', 404, reason='NOT FOUND')
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'NOT FOUND')
self.assertEqual(res.body, b'not found')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 404 NOT FOUND\r\n', fd.response)
def test_send_file(self):
res = Response.send_file('tests/files/test.txt',
content_type='text/html')

View File

@@ -0,0 +1,104 @@
import unittest
import sys
try:
import uio as io
except ImportError:
import io
try:
from unittest import mock
except ImportError:
mock = None
from microdot_wsgi import Microdot
@unittest.skipIf(sys.implementation.name == 'micropython',
'not supported under MicroPython')
class TestMicrodotWSGI(unittest.TestCase):
def test_wsgi_request_with_query_string(self):
app = Microdot()
@app.post('/foo/bar')
def index(req):
self.assertEqual(req.app, app)
self.assertEqual(req.client_addr, ('1.2.3.4', 1234))
self.assertEqual(req.method, 'POST')
self.assertEqual(req.http_version, 'HTTP/1.1')
self.assertEqual(req.path, '/foo/bar')
self.assertEqual(req.args, {'baz': ['1']})
self.assertEqual(req.cookies, {'session': 'xyz'})
self.assertEqual(req.body, b'body')
return 'response'
environ = {
'SCRIPT_NAME': '/foo',
'PATH_INFO': '/bar',
'QUERY_STRING': 'baz=1',
'HTTP_AUTHORIZATION': 'Bearer 123',
'HTTP_COOKIE': 'session=xyz',
'HTTP_CONTENT_LENGTH': '4',
'REMOTE_ADDR': '1.2.3.4',
'REMOTE_PORT': '1234',
'REQUEST_METHOD': 'POST',
'SERVER_PROTOCOL': 'HTTP/1.1',
'wsgi.input': io.BytesIO(b'body'),
}
def start_response(status, headers):
self.assertEqual(status, '200 OK')
self.assertEqual(
headers,
[('Content-Length', '8'), ('Content-Type', 'text/plain')])
r = app(environ, start_response)
self.assertEqual(next(r), b'response')
def test_wsgi_request_without_query_string(self):
app = Microdot()
@app.route('/foo/bar')
def index(req):
self.assertEqual(req.path, '/foo/bar')
self.assertEqual(req.args, {})
return 'response'
environ = {
'SCRIPT_NAME': '/foo',
'PATH_INFO': '/bar',
'REMOTE_ADDR': '1.2.3.4',
'REMOTE_PORT': '1234',
'REQUEST_METHOD': 'GET',
'SERVER_PROTOCOL': 'HTTP/1.1',
'wsgi.input': io.BytesIO(b''),
}
def start_response(status, headers):
pass
app(environ, start_response)
def test_shutdown(self):
app = Microdot()
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
environ = {
'PATH_INFO': '/shutdown',
'REMOTE_ADDR': '1.2.3.4',
'REMOTE_PORT': '1234',
'REQUEST_METHOD': 'GET',
'SERVER_PROTOCOL': 'HTTP/1.1',
'wsgi.input': io.BytesIO(b''),
}
def start_response(status, headers):
pass
with mock.patch('microdot_wsgi.os.kill') as kill:
app(environ, start_response)
kill.assert_called()

View File

@@ -56,7 +56,10 @@ class FakeStreamAsync:
async def readline(self):
return self.stream.readline()
async def read(self, n):
async def read(self, n=-1):
return self.stream.read(n)
async def readexactly(self, n):
return self.stream.read(n)
async def awrite(self, data):

15
tox.ini
View File

@@ -1,5 +1,5 @@
[tox]
envlist=flake8,py36,py37,py38,py39,upy
envlist=flake8,py36,py37,py38,py39,py310,upy
skipsdist=True
skip_missing_interpreters=True
@@ -9,21 +9,22 @@ python =
3.7: py37
3.8: py38
3.9: py39
3.10: py310
pypy3: pypy3
[testenv]
commands=
pip install -e microdot
pip install -e microdot-asyncio
coverage run --branch --include="microdot*.py" -m unittest tests
coverage report --show-missing
deps=coverage
pip install -e .
pytest -p no:logging --cov=src --cov-branch --cov-report=term-missing
deps=
pytest
pytest-cov
[testenv:flake8]
deps=
flake8
commands=
flake8 --ignore=W503 --exclude tests/libs microdot microdot-asyncio tests
flake8 --ignore=W503 --exclude tests/libs src tests examples
[testenv:upy]
whitelist_externals=sh