31 Commits

Author SHA1 Message Date
Miguel Grinberg
e508abc333 Release 0.7.0 2021-09-27 17:12:42 +01:00
Miguel Grinberg
5003a5b3d9 Limit the size of the request body 2021-09-27 17:01:43 +01:00
Miguel Grinberg
4ed101dfc6 Add security policy 2021-09-27 13:57:04 +01:00
Mark Blakeney
833fecb105 Add documentation for request.client_addr (#27) 2021-09-22 12:04:28 +01:00
Miguel Grinberg
d527bdb7c3 Added documentation for reason argument in the Response object 2021-08-11 12:00:46 +01:00
Miguel Grinberg
2516b296a7 Version 0.6.1.dev0 2021-08-11 10:37:04 +01:00
Miguel Grinberg
5061145f5c Release 0.6.0 2021-08-11 10:36:42 +01:00
Miguel Grinberg
122c638bae Fix codecov badge link #nolog 2021-08-11 10:33:10 +01:00
Miguel Grinberg
bd74bcab74 Accept a custom reason phrase for the HTTP response (Fixes #25) 2021-08-11 10:29:08 +01:00
Miguel Grinberg
5cd3ace516 More unit tests 2021-08-02 15:53:13 +01:00
Miguel Grinberg
da32f23e35 Better handling of content types in form and json methods (Fixes #24) 2021-08-02 15:39:32 +01:00
Mark Blakeney
0641466faa Copy client headers to avoid write back (#23) 2021-07-28 10:43:54 +01:00
Miguel Grinberg
dd3fc20507 Make mime type check for form submissions more robust 2021-06-06 20:05:32 +01:00
Miguel Grinberg
46963ba464 Work around a bug in uasyncio's create_server() function 2021-06-06 20:05:12 +01:00
Miguel Grinberg
1a8db51cb3 Installation instructions 2021-06-06 12:24:09 +01:00
Miguel Grinberg
d903c42370 Minor wording update in the documentation #nolog 2021-06-06 12:17:22 +01:00
Miguel Grinberg
8b4ebbd953 Run tests with pytest 2021-06-06 12:09:03 +01:00
Miguel Grinberg
a82ed55f56 Last version of the microdot-asyncio package 2021-06-06 11:54:51 +01:00
Miguel Grinberg
ac87f0542f Version 0.5.1.dev0 2021-06-06 11:49:01 +01:00
Miguel Grinberg
2de57498a8 Release 0.5.0 2021-06-06 11:47:09 +01:00
Miguel Grinberg
b7b881e3c7 merge microdot-asyncio package with microdot 2021-06-06 11:15:32 +01:00
Miguel Grinberg
9955ac99a6 change log 2021-06-06 00:38:46 +01:00
Miguel Grinberg
4b101d1597 Improve project structure 2021-06-06 00:29:52 +01:00
Miguel Grinberg
097cd9cf02 Documentation updates 2021-06-05 15:42:37 +01:00
Miguel Grinberg
b0c25a1a72 Support duplicate arguments in query string and form submissions
Fixes #21
2021-06-05 12:26:37 +01:00
Miguel Grinberg
b7b8e58d6a added documentation link 2021-06-05 00:56:16 +01:00
Miguel Grinberg
12cd60305b Documentation 2021-06-05 00:52:41 +01:00
Miguel Grinberg
4eea7adb8f Release v0.4.0 2021-06-04 17:16:18 +01:00
Miguel Grinberg
a3288a63ed Add method specific route decorators 2021-06-04 17:14:38 +01:00
Miguel Grinberg
3bd7fe8cea Update microypthon binary to 1.15 2021-06-04 16:57:51 +01:00
Miguel Grinberg
0ad538df91 Server shutdown (Fixes #19) 2021-06-04 16:01:07 +01:00
49 changed files with 2828 additions and 1285 deletions

View File

@@ -2,10 +2,10 @@ name: build
on:
push:
branches:
- master
- main
pull_request:
branches:
- master
- main
jobs:
lint:
name: lint

72
CHANGES.md Normal file
View File

@@ -0,0 +1,72 @@
# Microdot change log
**Release 0.7.0** - 2021-09-27
- Breaking change: Limit the size of the request body to 16KB. A different maximum can be set in `Request.max_content_length`. ([commit](https://github.com/miguelgrinberg/microdot/commit/5003a5b3d948a7cf365857b419bebf6e388593a1)) (thanks **Ky Tran**!)
- Add documentation for `request.client_addr` [#27](https://github.com/miguelgrinberg/microdot/issues/27) ([commit](https://github.com/miguelgrinberg/microdot/commit/833fecb105ce456b95f1d2a6ea96dceca1075814)) (thanks **Mark Blakeney**!)
- Added documentation for reason argument in the Response object ([commit](https://github.com/miguelgrinberg/microdot/commit/d527bdb7c32ab918a1ecf6956cf3a9f544504354))
**Release 0.6.0** - 2021-08-11
- Better handling of content types in form and json methods [#24](https://github.com/miguelgrinberg/microdot/issues/24) ([commit](https://github.com/miguelgrinberg/microdot/commit/da32f23e35f871470a40638e7000e84b0ff6d17f))
- Accept a custom reason phrase for the HTTP response [#25](https://github.com/miguelgrinberg/microdot/issues/25) ([commit](https://github.com/miguelgrinberg/microdot/commit/bd74bcab74f283c89aadffc8f9c20d6ff0f771ce))
- Make mime type check for form submissions more robust ([commit](https://github.com/miguelgrinberg/microdot/commit/dd3fc20507715a23d0fa6fa3aae3715c8fbc0351))
- Copy client headers to avoid write back [#23](https://github.com/miguelgrinberg/microdot/issues/23) ([commit](https://github.com/miguelgrinberg/microdot/commit/0641466faa9dda0c54f78939ac05993c0812e84a)) (thanks **Mark Blakeney**!)
- Work around a bug in uasyncio's create_server() function ([commit](https://github.com/miguelgrinberg/microdot/commit/46963ba4644d7abc8dc653c99bc76222af526964))
- More unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/5cd3ace5166ec549579b0b1149ae3d7be195974a))
- Installation instructions ([commit](https://github.com/miguelgrinberg/microdot/commit/1a8db51cb3754308da6dcc227512dcdeb4ce4557))
- Run tests with pytest ([commit](https://github.com/miguelgrinberg/microdot/commit/8b4ebbd9535b3c083fb2a955284609acba07f05e))
- Deprecated the microdot-asyncio package ([commit](https://github.com/miguelgrinberg/microdot/commit/a82ed55f56e14fbcea93e8171af86ab42657fa96))
**Release 0.5.0** - 2021-06-06
- [Documentation](https://microdot.readthedocs.io/en/latest/) site ([commit](https://github.com/miguelgrinberg/microdot/commit/12cd60305b7b48ab151da52661fc5988684dbcd8))
- Support duplicate arguments in query string and form submissions [#21](https://github.com/miguelgrinberg/microdot/issues/21) ([commit](https://github.com/miguelgrinberg/microdot/commit/b0c25a1a7298189373be5df1668e0afb5532cdaf))
- Merge `microdot-asyncio` package with `microdot` ([commit](https://github.com/miguelgrinberg/microdot/commit/b7b881e3c7f1c6ede6546e498737e93928425c30))
- Added a change log ([commit](https://github.com/miguelgrinberg/microdot/commit/9955ac99a6ac20308644f02d6e6e32847d28b70c))
- Improve project structure ([commit](https://github.com/miguelgrinberg/microdot/commit/4b101d15971fa2883d187f0bab0be999ae30b583))
**Release v0.4.0** - 2021-06-04
- Add HTTP method-specific route decorators ([commit](https://github.com/miguelgrinberg/microdot/commit/a3288a63ed45f700f79b67d0b57fc4dd20e844c1))
- Server shutdown [#19](https://github.com/miguelgrinberg/microdot/issues/19) ([commit](https://github.com/miguelgrinberg/microdot/commit/0ad538df91f8b6b8a3885aa602c014ee7fe4526b))
- Update microypthon binary for tests to 1.15 ([commit](https://github.com/miguelgrinberg/microdot/commit/3bd7fe8cea4598a7dbd0efcb9c6ce57ec2b79f9c))
**Release v0.3.1** - 2021-02-06
- Support large downloads in send_file [#3](https://github.com/miguelgrinberg/microdot/issues/3) ([commit](https://github.com/miguelgrinberg/microdot/commit/3e29af57753dbb7961ff98719a4fc4f71c0b4e3e))
- Move socket import and add simple hello example [#12](https://github.com/miguelgrinberg/microdot/issues/12) ([commit](https://github.com/miguelgrinberg/microdot/commit/c5e1873523b609680ff67d7abfada72568272250)) (thanks **Damien George**!)
- Update python versions to build ([commit](https://github.com/miguelgrinberg/microdot/commit/dfbe2edd797153fc9be40bc1928d93bdee7e7be5))
- Handle Chrome preconnect [#8](https://github.com/miguelgrinberg/microdot/issues/8) ([commit](https://github.com/miguelgrinberg/microdot/commit/125af4b4a92b1d78acfa9d57ad2f507e759b6938)) (thanks **Ricardo Mendonça Ferreira**!)
- Readme update ([commit](https://github.com/miguelgrinberg/microdot/commit/1aacb3cf46bd0b634ec3bc852ff9439f3c5dd773))
- Switch to GitHub actions for builds ([commit](https://github.com/miguelgrinberg/microdot/commit/4c0afa2beca0c3b0f167fd25c6849d6937c412ba))
**Release v0.3.0** - 2019-05-05
- g, before_request and after_request ([commit](https://github.com/miguelgrinberg/microdot/commit/8aa50f171d2d04bc15c472ab1d9b3288518f7a21))
- Threaded mode ([commit](https://github.com/miguelgrinberg/microdot/commit/494800ff9ff474c38644979086057e3584573969))
- Optional asyncio support ([commit](https://github.com/miguelgrinberg/microdot/commit/3d9b5d7084d52e749553ca79206ed7060f963f9d))
- Debug mode ([commit](https://github.com/miguelgrinberg/microdot/commit/4c83cb75636572066958ef2cc0802909deafe542))
- Print exceptions ([commit](https://github.com/miguelgrinberg/microdot/commit/491202de1fce232b9629b7f1db63594fd13f84a3))
- Flake8 ([commit](https://github.com/miguelgrinberg/microdot/commit/92edc17522d7490544c7186d62a2964caf35c861))
- Unit testing framework ([commit](https://github.com/miguelgrinberg/microdot/commit/f741ed7cf83320d25ce16a1a29796af6fdfb91e9))
- More robust header checking in tests ([commit](https://github.com/miguelgrinberg/microdot/commit/03efe46a26e7074f960dd4c9a062c53d6f72bfa0))
- Response unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/cd71986a5042dcc308617a3db89476f28dd13ecf))
- Request unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/0b95feafc96dc91d7d34528ff2d8931a8aa3d612))
- More unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/76ab1fa6d72dd9deaa24aeaf4895a0c6fc883bcb))
- Async request and response unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/89f7f09b9a2d0dfccefabebbe9b83307133bd97c))
- More asyncio unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/ba986a89ff72ebbd9a65307b81ee769879961594))
- Improve code structure ([commit](https://github.com/miguelgrinberg/microdot/commit/b16466f1a9432a608eb23769907e8952fe304a9a))
- URL pattern matching unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/0a373775d54df571ceddaac090094bb62dbe6c72))
- Rename microdot_async to microdot_asyncio ([commit](https://github.com/miguelgrinberg/microdot/commit/e5525c5c485ae8901c9602da7e4582b58fb2da40))
**Release 0.2.0** - 2019-04-19
- Error handlers ([commit](https://github.com/miguelgrinberg/microdot/commit/0f2c749f6d1b9edbf124523160e10449c932ea45))
- Fleshed out example GPIO application ([commit](https://github.com/miguelgrinberg/microdot/commit/52f2d0c4918d00d1a7e46cc7fd9a909ef6d259c1))
- More robust parsing of cookie header ([commit](https://github.com/miguelgrinberg/microdot/commit/2f58c41cc89946d51646df83d4f9ae0e24e447b9))
**Release 0.1.1** - 2019-04-17
- Minor fixes for micropython ([commit](https://github.com/miguelgrinberg/microdot/commit/e4ff70cf8fe839f5b5297157bf028569188b9031))
- Initial commit ([commit](https://github.com/miguelgrinberg/microdot/commit/311a82a44430d427948866b09cb6136e60a5b1c9))

View File

@@ -1,8 +1,9 @@
# microdot
[![Build status](https://github.com/miguelgrinberg/microdot/workflows/build/badge.svg)](https://github.com/miguelgrinberg/microdot/actions) [![codecov](https://codecov.io/gh/miguelgrinberg/microdot/branch/master/graph/badge.svg)](https://codecov.io/gh/miguelgrinberg/microdot)
[![Build status](https://github.com/miguelgrinberg/microdot/workflows/build/badge.svg)](https://github.com/miguelgrinberg/microdot/actions) [![codecov](https://codecov.io/gh/miguelgrinberg/microdot/branch/main/graph/badge.svg)](https://codecov.io/gh/miguelgrinberg/microdot)
A minimalistic Python web framework for microcontrollers inspired by Flask
## Documentation
## Resources
Coming soon!
- [Documentation](https://microdot.readthedocs.io/en/latest/)
- [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md)

9
SECURITY.md Normal file
View File

@@ -0,0 +1,9 @@
# Security Policy
## Reporting a Vulnerability
If you think you've found a vulnerability on this project, please send me (Miguel Grinberg) an email at miguel.grinberg@gmail.com with a description of the problem. I will personally review the issue and respond to you with next steps.
If the issue is highly sensitive, you are welcome to encrypt your message. Here is my [PGP key](https://keyserver.ubuntu.com/pks/lookup?search=miguel.grinberg%40gmail.com&fingerprint=on&op=index).
Please do not disclose vulnerabilities publicly before discussing how to proceed with me.

Binary file not shown.

20
docs/Makefile Normal file
View File

@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

3
docs/_static/css/custom.css vendored Normal file
View File

@@ -0,0 +1,3 @@
.py .class, .py .method, .py .property {
margin-top: 20px;
}

62
docs/api.rst Normal file
View File

@@ -0,0 +1,62 @@
API Reference
=============
``microdot`` module
-------------------
The ``microdot`` module defines a few classes that help implement HTTP-based
servers for MicroPython and standard Python, with multithreading support for
Python interpreters that support it.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Microdot
:members:
``Request`` class
~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Request
:members:
``Response`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.Response
:members:
``MultiDict`` class
~~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot.MultiDict
:members:
``microdot_asyncio`` module
---------------------------
The ``microdot_asyncio`` module defines a few classes that help implement
HTTP-based servers for MicroPython and standard Python that use ``asyncio``
and coroutines.
``Microdot`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Microdot
:inherited-members:
:members:
``Request`` class
~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Request
:inherited-members:
:members:
``Response`` class
~~~~~~~~~~~~~~~~~~
.. autoclass:: microdot_asyncio.Response
:inherited-members:
:members:

70
docs/conf.py Normal file
View File

@@ -0,0 +1,70 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../src'))
# -- Project information -----------------------------------------------------
project = 'Microdot'
copyright = '2021, Miguel Grinberg'
author = 'Miguel Grinberg'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_css_files = [
'css/custom.css',
]
html_theme_options = {
'github_user': 'miguelgrinberg',
'github_repo': 'microdot',
'github_banner': True,
'github_button': True,
'github_type': 'star',
'fixed_sidebar': True,
}
autodoc_default_options = {
'member-order': 'bysource',
}

21
docs/index.rst Normal file
View File

@@ -0,0 +1,21 @@
.. Microdot documentation master file, created by
sphinx-quickstart on Fri Jun 4 17:40:19 2021.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Microdot
========
Microdot is a minimalistic Python web framework inspired by
`Flask <https://flask.palletsprojects.com/>`_, and designed to run on
systems with limited resources such as microcontrollers. It runs on standard
Python and on `MicroPython <https://micropython.org>`_.
.. toctree::
:maxdepth: 3
intro
api
* :ref:`genindex`
* :ref:`search`

39
docs/intro.rst Normal file
View File

@@ -0,0 +1,39 @@
Installation
------------
Microdot can be installed with ``pip``::
pip install microdot
For platforms that do not support or cannot run ``pip``, you can also manually
copy and install the ``microdot.py`` and ``microdot_asyncio.py`` source files.
Examples
--------
The following is an example of a standard single or multi-threaded web
server::
from microdot import Microdot
app = Microdot()
@app.route('/')
def hello(request):
return 'Hello, world!'
app.run()
Microdot also supports the asynchronous model and can be used under
``asyncio``. The example that follows is equivalent to the one above, but uses
coroutines for concurrency::
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def hello(request):
return 'Hello, world!'
app.run()

35
docs/make.bat Normal file
View File

@@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@@ -2,7 +2,7 @@ from microdot import Microdot, Response
app = Microdot()
htmldoc = """<!DOCTYPE html>
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
@@ -11,16 +11,22 @@ htmldoc = """<!DOCTYPE html>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
"""
'''
@app.route("", methods=["GET", "POST"])
def serial_number(request):
print(request.headers)
return Response(body=htmldoc, headers={"Content-Type": "text/html"})
@app.route('/')
def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
app.run(debug=True)

40
examples/hello_async.py Normal file
View File

@@ -0,0 +1,40 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot_asyncio import Microdot, Response
app = Microdot()
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
</head>
<body>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
'''
@app.route('/')
async def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
@app.route('/shutdown')
async def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
async def main():
await app.start_server(debug=True)
asyncio.run(main())

5
legacy/README.md Normal file
View File

@@ -0,0 +1,5 @@
microdot-asyncio
================
This package has been merged with the ``microdot`` package. It currently
installs as an empty package that depends on it.

6
legacy/pyproject.toml Normal file
View File

@@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

24
legacy/setup.cfg Normal file
View File

@@ -0,0 +1,24 @@
[metadata]
name = microdot-asyncio
version = 0.5.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = AsyncIO support for the Microdot web framework'
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/miguelgrinberg/microdot
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/microdot/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Programming Language :: Python :: 3
Programming Language :: Python :: Implementation :: MicroPython
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
zip_safe = False
include_package_data = True
py_modules =
install_requires =
microdot

3
legacy/setup.py Normal file
View File

@@ -0,0 +1,3 @@
import setuptools
setuptools.setup()

View File

@@ -1,166 +0,0 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class Request(BaseRequest):
@staticmethod
async def create(stream, client_addr):
# request line
line = (await stream.readline()).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await stream.readline()).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = await stream.read(content_length) \
if content_length else b''
return Request(client_addr, method, url, http_version, headers, body)
class Response(BaseResponse):
async def write(self, stream):
self.complete()
# status code
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
await stream.awrite(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'):
self.body.close()
else:
await stream.awrite(self.body)
class Microdot(BaseMicrodot):
def run(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.dispatch_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.start_server(serve, host, port))
loop.run_forever()
loop.close() # pragma: no cover
async def dispatch_request(self, reader, writer):
req = await Request.create(reader, writer.get_extra_info('peername'))
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
await res.write(writer)
await writer.aclose()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -1,35 +0,0 @@
"""
Microdot-AsyncIO
----------------
AsyncIO support for the Microdot web framework.
"""
from setuptools import setup
setup(
name='microdot-asyncio',
version="0.3.1",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='AsyncIO support for the Microdot web framework',
long_description=__doc__,
py_modules=['microdot_asyncio'],
platforms='any',
install_requires=[
'microdot',
'micropython-uasyncio;implementation_name=="micropython"'
],
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: Implementation :: MicroPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View File

@@ -1,416 +0,0 @@
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
"""Use the threading module."""
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
"""Use MicroPython's _thread module."""
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
"""No threads available, call function synchronously."""
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class Request():
class G:
pass
def __init__(self, client_addr, method, url, http_version, headers, body):
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self.body = body
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(client_stream, client_addr):
# request line
line = client_stream.readline().strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = client_stream.read(content_length) if content_length else b''
return Request(client_addr, method, url, http_version, headers, body)
def _parse_urlencoded(self, urlencoded):
return {
urldecode(key): urldecode(value) for key, value in [
pair.split('=', 1) for pair in
urlencoded.split('&')]}
@property
def json(self):
if self.content_type != 'application/json':
return None
if self._json is None:
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self.content_type != 'application/x-www-form-urlencoded':
return None
if self._form is None:
self._form = self._parse_urlencoded(self.body.decode())
return self._form
class Response():
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None):
self.status_code = status_code
self.headers = headers or {}
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes or file-like objects
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
stream.write(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'):
self.body.close()
else:
stream.write(self.body)
@classmethod
def redirect(cls, location, status_code=302):
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.debug = False
def route(self, url_pattern, methods=None):
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def before_request(self, f):
self.before_request_handlers.append(f)
return f
def after_request(self, f):
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
s = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
while True:
sock, addr = s.accept()
create_thread(self.dispatch_request, sock, addr)
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = Request.create(stream, addr)
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -1,31 +0,0 @@
"""
Microdot
--------
The impossibly small web framework for MicroPython.
"""
from setuptools import setup
setup(
name='microdot',
version="0.3.1",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='The impossibly small web framework for MicroPython',
long_description=__doc__,
py_modules=['microdot'],
platforms='any',
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: Implementation :: MicroPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

6
pyproject.toml Normal file
View File

@@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

View File

@@ -1,7 +1,6 @@
import sys
sys.path.insert(0, 'microdot')
sys.path.insert(1, 'microdot-asyncio')
sys.path.insert(0, 'src')
sys.path.insert(2, 'tests/libs')
import unittest

27
setup.cfg Normal file
View File

@@ -0,0 +1,27 @@
[metadata]
name = microdot
version = 0.7.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/miguelgrinberg/microdot
project_urls =
Bug Tracker = https://github.com/miguelgrinberg/microdot/issues
classifiers =
Environment :: Web Environment
Intended Audience :: Developers
Programming Language :: Python :: 3
Programming Language :: Python :: Implementation :: MicroPython
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
zip_safe = False
include_package_data = True
package_dir =
= src
py_modules =
microdot
microdot_asyncio

3
setup.py Executable file
View File

@@ -0,0 +1,3 @@
import setuptools
setuptools.setup()

839
src/microdot.py Normal file
View File

@@ -0,0 +1,839 @@
"""
microdot
--------
The ``microdot`` module defines a few classes that help implement HTTP-based
servers for MicroPython and standard Python, with multithreading support for
Python interpreters that support it.
"""
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
try:
import uerrno as errno
except ImportError:
import errno
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
# use the threading module
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
# use MicroPython's _thread module
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
# no threads available, call function synchronously
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
form submissions.
:param initial_dict: an initial dictionary of key/value pairs to
initialize this object with.
Example::
>>> d = MultiDict()
>>> d['sort'] = 'name'
>>> d['sort'] = 'email'
>>> print(d['sort'])
'name'
>>> print(d.getlist('sort'))
['name', 'email']
"""
def __init__(self, initial_dict=None):
super().__init__()
if initial_dict:
for key, value in initial_dict.items():
self[key] = value
def __setitem__(self, key, value):
if key not in self:
super().__setitem__(key, [])
super().__getitem__(key).append(value)
def __getitem__(self, key):
return super().__getitem__(key)[0]
def get(self, key, default=None, type=None):
"""Return the value for a given key.
:param key: The key to retrieve.
:param default: A default value to use if the key does not exist.
:param type: A type conversion callable to apply to the value.
If the multidict contains more than one value for the requested key,
this method returns the first value only.
Example::
>>> d = MultiDict()
>>> d['age'] = '42'
>>> d.get('age')
'42'
>>> d.get('age', type=int)
42
>>> d.get('name', default='noname')
'noname'
"""
if key not in self:
return default
value = self[key]
if type is not None:
value = type(value)
return value
def getlist(self, key, type=None):
"""Return all the values for a given key.
:param key: The key to retrieve.
:param type: A type conversion callable to apply to the values.
If the requested key does not exist in the dictionary, this method
returns an empty list.
Example::
>>> d = MultiDict()
>>> d.getlist('items')
[]
>>> d['items'] = '3'
>>> d.getlist('items')
['3']
>>> d['items'] = '56'
>>> d.getlist('items')
['3', '56']
>>> d.getlist('items', type=int)
[3, 56]
"""
if key not in self:
return []
values = super().__getitem__(key)
if type is not None:
values = [type(value) for value in values]
return values
class Request():
"""An HTTP request class.
:var app: The application instance to which this request belongs.
:var client_addr: The address of the client, as a tuple (host, port).
:var method: The HTTP method of the request.
:var path: The path portion of the URL.
:var query_string: The query string portion of the URL.
:var args: The parsed query string, as a :class:`MultiDict` object.
:var headers: A dictionary with the headers included in the request.
:var cookies: A dictionary with the cookies included in the request.
:var content_length: The parsed ``Content-Length`` header.
:var content_type: The parsed ``Content-Type`` header.
:var body: A stream from where the body can be read.
:var json: The parsed JSON body, as a dictionary or list, or ``None`` if
the request does not have a JSON body.
:var form: The parsed form submission body, as a :class:`MultiDict` object,
or ``None`` if the request does not have a form submission.
:var g: A general purpose container for applications to store data during
the life of the request.
"""
#: Specify the maximum payload size that is accepted. Requests with larger
#: payloads will be rejected with a 413 status code. Applications can
#: change this maximum as necessary.
#:
#: Example::
#:
#: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed
max_content_length = 16 * 1024
class G:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body):
self.app = app
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self.body = body
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method returns a newly created ``Request`` object.
"""
# request line
line = client_stream.readline().strip().decode()
if not line:
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = client_stream.read(content_length) if content_length and \
content_length <= Request.max_content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
def _parse_urlencoded(self, urlencoded):
data = MultiDict()
for k, v in [pair.split('=', 1) for pair in urlencoded.split('&')]:
data[urldecode(k)] = urldecode(v)
return data
@property
def json(self):
if self._json is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/json':
return None
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self._form is None:
if self.content_type is None:
return None
mime_type = self.content_type.split(';')[0]
if mime_type != 'application/x-www-form-urlencoded':
return None
self._form = self._parse_urlencoded(self.body.decode())
return self._form
class Response():
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None, reason=None):
self.status_code = status_code
self.headers = headers.copy() if headers else {}
self.reason = reason
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes or file-like objects
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
"""Add a cookie to the response.
:param cookie: The cookie's name.
:param value: The cookie's value.
:param path: The cookie's path.
:param domain: The cookie's domain.
:param expires: The cookie expiration time, as a ``datetime`` object.
:param max_age: The cookie's ``Max-Age`` value.
:param secure: The cookie's ``secure`` flag.
:param http_only: The cookie's ``HttpOnly`` flag.
"""
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
stream.write(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no cover
self.body.close()
else:
stream.write(self.body)
@classmethod
def redirect(cls, location, status_code=302):
"""Return a redirect response.
:param location: The URL to redirect to.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
"""
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
"""Send file contents in a response.
:param filename: The filename of the file.
:param status_code: The 3xx status code to use for the redirect. The
default is 302.
:param content_type: The ``Content-Type`` header to use in the
response. If omitted, it is generated
automatically from the file extension.
"""
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
"""An HTTP application class.
This class implements an HTTP application instance and is heavily
influenced by the ``Flask`` class of the Flask framework. It is typically
declared near the start of the main application script.
Example::
from microdot import Microdot
app = Microdot()
"""
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
self.server = None
def route(self, url_pattern, methods=None):
"""Decorator that is used to register a function as a request handler
for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
:param methods: The list of HTTP methods to be handled by the
decorated function. If omitted, only ``GET`` requests
are handled.
The URL pattern can be a static path (for example, ``/users`` or
``/api/invoices/search``) or a path with dynamic components enclosed
in ``<`` and ``>`` (for example, ``/users/<id>`` or
``/invoices/<number>/products``). Dynamic path components can also
include a type prefix, separated from the name with a colon (for
example, ``/users/<int:id>``). The type can be ``string`` (the
default), ``int``, ``path`` or ``re:[regular-expression]``.
The first argument of the decorated function must be
the request object. Any path arguments that are specified in the URL
pattern are passed as keyword arguments. The return value of the
function must be a :class:`Response` instance, or the arguments to
be passed to this class.
Example::
@app.route('/')
def index(request):
return 'Hello, world!'
"""
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def get(self, url_pattern):
"""Decorator that is used to register a function as a ``GET`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['GET']``.
Example::
@app.get('/users/<int:id>')
def get_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['GET'])
def post(self, url_pattern):
"""Decorator that is used to register a function as a ``POST`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the``route`` decorator with
``methods=['POST']``.
Example::
@app.post('/users')
def create_user(request):
# ...
"""
return self.route(url_pattern, methods=['POST'])
def put(self, url_pattern):
"""Decorator that is used to register a function as a ``PUT`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PUT']``.
Example::
@app.put('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PUT'])
def patch(self, url_pattern):
"""Decorator that is used to register a function as a ``PATCH`` request
handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['PATCH']``.
Example::
@app.patch('/users/<int:id>')
def edit_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['PATCH'])
def delete(self, url_pattern):
"""Decorator that is used to register a function as a ``DELETE``
request handler for a given URL.
:param url_pattern: The URL pattern that will be compared against
incoming requests.
This decorator can be used as an alias to the ``route`` decorator with
``methods=['DELETE']``.
Example::
@app.delete('/users/<int:id>')
def delete_user(request, id):
# ...
"""
return self.route(url_pattern, methods=['DELETE'])
def before_request(self, f):
"""Decorator to register a function to run before each request is
handled. The decorated function must take a single argument, the
request object.
Example::
@app.before_request
def func(request):
# ...
"""
self.before_request_handlers.append(f)
return f
def after_request(self, f):
"""Decorator to register a function to run after each request is
handled. The decorated function must take two arguments, the request
and response objects. The return value of the function must be an
updated response object.
Example::
@app.before_request
def func(request, response):
# ...
"""
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
"""Decorator to register a function as an error handler. Error handler
functions for numeric HTTP status codes must accept a single argument,
the request object. Error handler functions for Python exceptions
must accept two arguments, the request object and the exception
object.
:param status_code_or_exception_class: The numeric HTTP status code or
Python exception class to
handle.
Examples::
@app.errorhandler(404)
def not_found(request):
return 'Not found'
@app.errorhandler(RuntimeError)
def runtime_error(request, exception):
return 'Runtime error'
"""
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot import Microdot
app = Microdot()
@app.route('/')
def index():
return 'Hello, world!'
app.run(debug=True)
"""
self.debug = debug
self.shutdown_requested = False
self.server = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(addr)
self.server.listen(5)
while not self.shutdown_requested:
try:
sock, addr = self.server.accept()
except OSError as exc: # pragma: no cover
if exc.args[0] == errno.ECONNABORTED:
break
else:
raise
create_thread(self.dispatch_request, sock, addr)
def shutdown(self):
"""Request a server shutdown. The server will then exit its request
listening loop and the :func:`run` function will return. This function
can be safely called from a route handler, as it only schedules the
server to terminate as soon as the request completes.
Example::
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
"""
self.shutdown_requested = True
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = Request.create(self, stream, addr)
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = self.error_handlers[413](req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
if self.shutdown_requested: # pragma: no cover
self.server.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
redirect = Response.redirect
send_file = Response.send_file

280
src/microdot_asyncio.py Normal file
View File

@@ -0,0 +1,280 @@
"""
microdot_asyncio
----------------
The ``microdot_asyncio`` module defines a few classes that help implement
HTTP-based servers for MicroPython and standard Python that use ``asyncio``
and coroutines.
"""
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class Request(BaseRequest):
@staticmethod
async def create(app, client_stream, client_addr):
"""Create a request object.
:param app: The Microdot application instance.
:param client_stream: An input stream from where the request data can
be read.
:param client_addr: The address of the client, as a tuple.
This method is a coroutine. It returns a newly created ``Request``
object.
"""
# request line
line = (await client_stream.readline()).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await client_stream.readline()).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = await client_stream.read(content_length) if content_length and \
content_length <= Request.max_content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
class Response(BaseResponse):
"""An HTTP response class.
:param body: The body of the response. If a dictionary or list is given,
a JSON formatter is used to generate the body.
:param status_code: The numeric HTTP status code of the response. The
default is 200.
:param headers: A dictionary of headers to include in the response.
:param reason: A custom reason phrase to add after the status code. The
default is "OK" for responses with a 200 status code and
"N/A" for any other status codes.
"""
async def write(self, stream):
self.complete()
# status code
reason = self.reason if self.reason is not None else \
('OK' if self.status_code == 200 else 'N/A')
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code, reason=reason).encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
await stream.awrite(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no cover
self.body.close()
else:
await stream.awrite(self.body)
class Microdot(BaseMicrodot):
async def start_server(self, host='0.0.0.0', port=5000, debug=False):
"""Start the Microdot web server as a coroutine. This coroutine does
not normally return, as the server enters an endless listening loop.
The :func:`shutdown` function provides a method for terminating the
server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
This method is a coroutine.
Example::
import asyncio
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
async def main():
await app.start_server(debug=True)
asyncio.run(main())
"""
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.dispatch_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
self.server = await asyncio.start_server(serve, host, port)
while True:
try:
await self.server.wait_closed()
break
except AttributeError: # pragma: no cover
# the task hasn't been initialized in the server object yet
# wait a bit and try again
await asyncio.sleep(0.1)
def run(self, host='0.0.0.0', port=5000, debug=False):
"""Start the web server. This function does not normally return, as
the server enters an endless listening loop. The :func:`shutdown`
function provides a method for terminating the server gracefully.
:param host: The hostname or IP address of the network interface that
will be listening for requests. A value of ``'0.0.0.0'``
(the default) indicates that the server should listen for
requests on all the available interfaces, and a value of
``127.0.0.1`` indicates that the server should listen
for requests only on the internal networking interface of
the host.
:param port: The port number to listen for requests. The default is
port 5000.
:param debug: If ``True``, the server logs debugging information. The
default is ``False``.
Example::
from microdot_asyncio import Microdot
app = Microdot()
@app.route('/')
async def index():
return 'Hello, world!'
app.run(debug=True)
"""
asyncio.run(self.start_server(host=host, port=port, debug=debug))
def shutdown(self):
self.server.close()
async def dispatch_request(self, reader, writer):
req = await Request.create(self, reader,
writer.get_extra_info('peername'))
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[413], req)
else:
res = 'Payload too large', 413
else:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
await res.write(writer)
await writer.aclose()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -1,3 +1,4 @@
from tests.microdot.test_multidict import TestMultiDict
from tests.microdot.test_request import TestRequest
from tests.microdot.test_response import TestResponse
from tests.microdot.test_url_pattern import TestURLPattern

View File

@@ -1,258 +1,30 @@
import uerrno
import uselect as select
import usocket as _socket
from uasyncio.core import *
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
from .core import *
DEBUG = 0
log = None
__version__ = (3, 0, 0)
def set_debug(val):
global DEBUG, log
DEBUG = val
if val:
import logging
log = logging.getLogger("uasyncio")
_attrs = {
"wait_for": "funcs",
"wait_for_ms": "funcs",
"gather": "funcs",
"Event": "event",
"ThreadSafeFlag": "event",
"Lock": "lock",
"open_connection": "stream",
"start_server": "stream",
"StreamReader": "stream",
"StreamWriter": "stream",
}
class PollEventLoop(EventLoop):
def __init__(self, runq_len=16, waitq_len=16):
EventLoop.__init__(self, runq_len, waitq_len)
self.poller = select.poll()
self.objmap = {}
def add_reader(self, sock, cb, *args):
if DEBUG and __debug__:
log.debug("add_reader%s", (sock, cb, args))
if args:
self.poller.register(sock, select.POLLIN)
self.objmap[id(sock)] = (cb, args)
else:
self.poller.register(sock, select.POLLIN)
self.objmap[id(sock)] = cb
def remove_reader(self, sock):
if DEBUG and __debug__:
log.debug("remove_reader(%s)", sock)
self.poller.unregister(sock)
del self.objmap[id(sock)]
def add_writer(self, sock, cb, *args):
if DEBUG and __debug__:
log.debug("add_writer%s", (sock, cb, args))
if args:
self.poller.register(sock, select.POLLOUT)
self.objmap[id(sock)] = (cb, args)
else:
self.poller.register(sock, select.POLLOUT)
self.objmap[id(sock)] = cb
def remove_writer(self, sock):
if DEBUG and __debug__:
log.debug("remove_writer(%s)", sock)
try:
self.poller.unregister(sock)
self.objmap.pop(id(sock), None)
except OSError as e:
# StreamWriter.awrite() first tries to write to a socket,
# and if that succeeds, yield IOWrite may never be called
# for that socket, and it will never be added to poller. So,
# ignore such error.
if e.args[0] != uerrno.ENOENT:
raise
def wait(self, delay):
if DEBUG and __debug__:
log.debug("poll.wait(%d)", delay)
# We need one-shot behavior (second arg of 1 to .poll())
res = self.poller.ipoll(delay, 1)
#log.debug("poll result: %s", res)
# Remove "if res" workaround after
# https://github.com/micropython/micropython/issues/2716 fixed.
if res:
for sock, ev in res:
cb = self.objmap[id(sock)]
if ev & (select.POLLHUP | select.POLLERR):
# These events are returned even if not requested, and
# are sticky, i.e. will be returned again and again.
# If the caller doesn't do proper error handling and
# unregister this sock, we'll busy-loop on it, so we
# as well can unregister it now "just in case".
self.remove_reader(sock)
if DEBUG and __debug__:
log.debug("Calling IO callback: %r", cb)
if isinstance(cb, tuple):
cb[0](*cb[1])
else:
cb.pend_throw(None)
self.call_soon(cb)
class StreamReader:
def __init__(self, polls, ios=None):
if ios is None:
ios = polls
self.polls = polls
self.ios = ios
def read(self, n=-1):
while True:
yield IORead(self.polls)
res = self.ios.read(n)
if res is not None:
break
# This should not happen for real sockets, but can easily
# happen for stream wrappers (ssl, websockets, etc.)
#log.warn("Empty read")
if not res:
yield IOReadDone(self.polls)
return res
def readexactly(self, n):
buf = b""
while n:
yield IORead(self.polls)
res = self.ios.read(n)
assert res is not None
if not res:
yield IOReadDone(self.polls)
break
buf += res
n -= len(res)
return buf
def readline(self):
if DEBUG and __debug__:
log.debug("StreamReader.readline()")
buf = b""
while True:
yield IORead(self.polls)
res = self.ios.readline()
assert res is not None
if not res:
yield IOReadDone(self.polls)
break
buf += res
if buf[-1] == 0x0a:
break
if DEBUG and __debug__:
log.debug("StreamReader.readline(): %s", buf)
return buf
def aclose(self):
yield IOReadDone(self.polls)
self.ios.close()
def __repr__(self):
return "<StreamReader %r %r>" % (self.polls, self.ios)
class StreamWriter:
def __init__(self, s, extra):
self.s = s
self.extra = extra
def awrite(self, buf, off=0, sz=-1):
# This method is called awrite (async write) to not proliferate
# incompatibility with original asyncio. Unlike original asyncio
# whose .write() method is both not a coroutine and guaranteed
# to return immediately (which means it has to buffer all the
# data), this method is a coroutine.
if sz == -1:
sz = len(buf) - off
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): spooling %d bytes", sz)
while True:
res = self.s.write(buf, off, sz)
# If we spooled everything, return immediately
if res == sz:
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): completed spooling %d bytes", res)
return
if res is None:
res = 0
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): spooled partial %d bytes", res)
assert res < sz
off += res
sz -= res
yield IOWrite(self.s)
#assert s2.fileno() == self.s.fileno()
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): can write more")
# Write piecewise content from iterable (usually, a generator)
def awriteiter(self, iterable):
for buf in iterable:
yield from self.awrite(buf)
def aclose(self):
yield IOWriteDone(self.s)
self.s.close()
def get_extra_info(self, name, default=None):
return self.extra.get(name, default)
def __repr__(self):
return "<StreamWriter %r>" % self.s
def open_connection(host, port, ssl=False):
if DEBUG and __debug__:
log.debug("open_connection(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
try:
s.connect(ai[-1])
except OSError as e:
if e.args[0] != uerrno.EINPROGRESS:
raise
if DEBUG and __debug__:
log.debug("open_connection: After connect")
yield IOWrite(s)
# if __debug__:
# assert s2.fileno() == s.fileno()
if DEBUG and __debug__:
log.debug("open_connection: After iowait: %s", s)
if ssl:
print("Warning: uasyncio SSL support is alpha")
import ussl
s.setblocking(True)
s2 = ussl.wrap_socket(s)
s.setblocking(False)
return StreamReader(s, s2), StreamWriter(s2, {})
return StreamReader(s), StreamWriter(s, {})
def start_server(client_coro, host, port, backlog=10):
if DEBUG and __debug__:
log.debug("start_server(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
while True:
if DEBUG and __debug__:
log.debug("start_server: Before accept")
yield IORead(s)
if DEBUG and __debug__:
log.debug("start_server: After iowait")
s2, client_addr = s.accept()
s2.setblocking(False)
if DEBUG and __debug__:
log.debug("start_server: After accept: %s", s2)
extra = {"peername": client_addr}
yield client_coro(StreamReader(s2), StreamWriter(s2, extra))
import uasyncio.core
uasyncio.core._event_loop_class = PollEventLoop
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

View File

@@ -1,318 +1,281 @@
import utime as time
import utimeq
import ucollections
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
type_gen = type((lambda: (yield))())
DEBUG = 0
log = None
def set_debug(val):
global DEBUG, log
DEBUG = val
if val:
import logging
log = logging.getLogger("uasyncio.core")
class CancelledError(Exception):
pass
class TimeoutError(CancelledError):
pass
class EventLoop:
def __init__(self, runq_len=16, waitq_len=16):
self.runq = ucollections.deque((), runq_len, True)
self.waitq = utimeq.utimeq(waitq_len)
# Current task being run. Task is a top-level coroutine scheduled
# in the event loop (sub-coroutines executed transparently by
# yield from/await, event loop "doesn't see" them).
self.cur_task = None
def time(self):
return time.ticks_ms()
def create_task(self, coro):
# CPython 3.4.2
self.call_later_ms(0, coro)
# CPython asyncio incompatibility: we don't return Task object
def call_soon(self, callback, *args):
if __debug__ and DEBUG:
log.debug("Scheduling in runq: %s", (callback, args))
self.runq.append(callback)
if not isinstance(callback, type_gen):
self.runq.append(args)
def call_later(self, delay, callback, *args):
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
def call_later_ms(self, delay, callback, *args):
if not delay:
return self.call_soon(callback, *args)
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
def call_at_(self, time, callback, args=()):
if __debug__ and DEBUG:
log.debug("Scheduling in waitq: %s", (time, callback, args))
self.waitq.push(time, callback, args)
def wait(self, delay):
# Default wait implementation, to be overriden in subclasses
# with IO scheduling
if __debug__ and DEBUG:
log.debug("Sleeping for: %s", delay)
time.sleep_ms(delay)
def run_forever(self):
cur_task = [0, 0, 0]
while True:
# Expire entries in waitq and move them to runq
tnow = self.time()
while self.waitq:
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay > 0:
break
self.waitq.pop(cur_task)
if __debug__ and DEBUG:
log.debug("Moving from waitq to runq: %s", cur_task[1])
self.call_soon(cur_task[1], *cur_task[2])
# Process runq
l = len(self.runq)
if __debug__ and DEBUG:
log.debug("Entries in runq: %d", l)
while l:
cb = self.runq.popleft()
l -= 1
args = ()
if not isinstance(cb, type_gen):
args = self.runq.popleft()
l -= 1
if __debug__ and DEBUG:
log.info("Next callback to run: %s", (cb, args))
cb(*args)
continue
if __debug__ and DEBUG:
log.info("Next coroutine to run: %s", (cb, args))
self.cur_task = cb
delay = 0
# Import TaskQueue and Task, preferring built-in C code over Python code
try:
if args is ():
ret = next(cb)
else:
ret = cb.send(*args)
if __debug__ and DEBUG:
log.info("Coroutine %s yield result: %s", cb, ret)
if isinstance(ret, SysCall1):
arg = ret.arg
if isinstance(ret, SleepMs):
delay = arg
elif isinstance(ret, IORead):
cb.pend_throw(False)
self.add_reader(arg, cb)
continue
elif isinstance(ret, IOWrite):
cb.pend_throw(False)
self.add_writer(arg, cb)
continue
elif isinstance(ret, IOReadDone):
self.remove_reader(arg)
elif isinstance(ret, IOWriteDone):
self.remove_writer(arg)
elif isinstance(ret, StopLoop):
return arg
else:
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
elif isinstance(ret, type_gen):
self.call_soon(ret)
elif isinstance(ret, int):
# Delay
delay = ret
elif ret is None:
# Just reschedule
pass
elif ret is False:
# Don't reschedule
continue
else:
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
except StopIteration as e:
if __debug__ and DEBUG:
log.debug("Coroutine finished: %s", cb)
continue
except CancelledError as e:
if __debug__ and DEBUG:
log.debug("Coroutine cancelled: %s", cb)
continue
# Currently all syscalls don't return anything, so we don't
# need to feed anything to the next invocation of coroutine.
# If that changes, need to pass that value below.
if delay:
self.call_later_ms(delay, cb)
else:
self.call_soon(cb)
from _uasyncio import TaskQueue, Task
except:
from .task import TaskQueue, Task
# Wait until next waitq task or I/O availability
delay = 0
if not self.runq:
delay = -1
if self.waitq:
tnow = self.time()
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay < 0:
delay = 0
self.wait(delay)
def run_until_complete(self, coro):
ret = None
def _run_and_stop():
nonlocal ret
ret = yield from coro
yield StopLoop(0)
self.call_soon(_run_and_stop())
self.run_forever()
return ret
################################################################################
# Exceptions
def stop(self):
self.call_soon((lambda: (yield StopLoop(0)))())
def close(self):
class CancelledError(BaseException):
pass
class SysCall:
def __init__(self, *args):
self.args = args
def handle(self):
raise NotImplementedError
# Optimized syscall with 1 arg
class SysCall1(SysCall):
def __init__(self, arg):
self.arg = arg
class StopLoop(SysCall1):
pass
class IORead(SysCall1):
pass
class IOWrite(SysCall1):
pass
class IOReadDone(SysCall1):
pass
class IOWriteDone(SysCall1):
class TimeoutError(Exception):
pass
_event_loop = None
_event_loop_class = EventLoop
def get_event_loop(runq_len=16, waitq_len=16):
global _event_loop
if _event_loop is None:
_event_loop = _event_loop_class(runq_len, waitq_len)
return _event_loop
# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
def sleep(secs):
yield int(secs * 1000)
# Implementation of sleep_ms awaitable with zero heap memory usage
class SleepMs(SysCall1):
################################################################################
# Sleep functions
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.v = None
self.arg = None
def __call__(self, arg):
self.v = arg
#print("__call__")
return self
self.state = None
self.exc = StopIteration()
def __iter__(self):
#print("__iter__")
return self
def __next__(self):
if self.v is not None:
#print("__next__ syscall enter")
self.arg = self.v
self.v = None
return self
#print("__next__ syscall exit")
_stop_iter.__traceback__ = None
raise _stop_iter
_stop_iter = StopIteration()
sleep_ms = SleepMs()
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
def cancel(coro):
prev = coro.pend_throw(CancelledError())
if prev is False:
_event_loop.call_soon(coro)
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
assert sgen.state is None
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
class TimeoutObj:
def __init__(self, coro):
self.coro = coro
# Pause task execution for the given time (in seconds)
def sleep(t):
return sleep_ms(int(t * 1000))
def wait_for_ms(coro, timeout):
def waiter(coro, timeout_obj):
res = yield from coro
if __debug__ and DEBUG:
log.debug("waiter: cancelling %s", timeout_obj)
timeout_obj.coro = None
return res
def timeout_func(timeout_obj):
if timeout_obj.coro:
if __debug__ and DEBUG:
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
prev = timeout_obj.coro.pend_throw(TimeoutError())
#print("prev pend", prev)
if prev is False:
_event_loop.call_soon(timeout_obj.coro)
timeout_obj = TimeoutObj(_event_loop.cur_task)
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
return (yield from waiter(coro, timeout_obj))
################################################################################
# Queue and poller for stream IO
def wait_for(coro, timeout):
return wait_for_ms(coro, int(timeout * 1000))
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
def _enqueue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
# Link task to this IOQueue so it can be removed if needed
cur_task.data = self
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
def queue_read(self, s):
self._enqueue(s, 0)
def queue_write(self, s):
self._enqueue(s, 1)
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
def coroutine(f):
return f
################################################################################
# Main run loop
#
# The functions below are deprecated in uasyncio, and provided only
# for compatibility with CPython asyncio
#
def ensure_future(coro, loop=_event_loop):
_event_loop.call_soon(coro)
# CPython asyncio incompatibility: we don't return Task object
return coro
# Ensure the awaitable is a task
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
# CPython asyncio incompatibility: Task is a function, not a class (for efficiency)
def Task(coro, loop=_event_loop):
# Same as async()
_event_loop.call_soon(coro)
# Create and schedule a new task from a coroutine
def create_task(coro):
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
return t
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _task_queue is ready to run
dt = 1
while dt > 0:
dt = -1
t = _task_queue.peek()
if t:
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
exc = t.data
if not exc:
t.coro.send(None)
else:
t.data = None
t.coro.throw(exc)
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
# Schedule any other tasks waiting on the completion of this task
waiting = False
if hasattr(t, "waiting"):
while t.waiting.peek():
_task_queue.push_head(t.waiting.pop_head())
waiting = True
t.waiting = None # Free waiting queue head
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Indicate task is done by setting coro to the task object itself
t.coro = t
# Save return value of coro to pass up to caller
t.data = er
# Create a new task from a coroutine and run it until it finishes
def run(coro):
return run_until_complete(create_task(coro))
################################################################################
# Event loop wrapper
async def _stopper():
pass
_stop_task = None
class Loop:
_exc_handler = None
def create_task(coro):
return create_task(coro)
def run_forever():
global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(aw):
return run_until_complete(_promote_to_task(aw))
def stop():
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None
def close():
pass
def set_exception_handler(handler):
Loop._exc_handler = handler
def get_exception_handler():
return Loop._exc_handler
def default_exception_handler(loop, context):
print(context["message"])
print("future:", context["future"], "coro=", context["future"].coro)
sys.print_exception(context["exception"])
def call_exception_handler(context):
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
return Loop
def current_task():
return cur_task
def new_event_loop():
global _task_queue, _io_queue
# TaskQueue of Task instances
_task_queue = TaskQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
return Loop
# Initialise default event loop
new_event_loop()

View File

@@ -0,0 +1,62 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
# Event class for primitive events that can be waited on, set, and cleared
class Event:
def __init__(self):
self.state = False # False=unset; True=set
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
def is_set(self):
return self.state
def set(self):
# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
self.state = True
def clear(self):
self.state = False
async def wait(self):
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
yield
return True
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
if not self._flag:
yield core._io_queue.queue_read(self)
self._flag = 0
except ImportError:
pass

View File

@@ -0,0 +1,74 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
async def wait_for(aw, timeout, sleep=core.sleep):
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()
# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)
async def gather(*aws, return_exceptions=False):
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except Exception as er:
if return_exceptions:
ts[i] = er
else:
raise er
return ts

View File

@@ -0,0 +1,53 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
# Lock class for primitive mutex capability
class Lock:
def __init__(self):
# The state can take the following values:
# - 0: unlocked
# - 1: locked
# - <Task>: unlocked but this task has been scheduled to acquire the lock next
self.state = 0
# Queue of Tasks waiting to acquire this Lock
self.waiting = core.TaskQueue()
def locked(self):
return self.state == 1
def release(self):
if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
yield
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 1
self.release()
raise er
# Lock available, set it as locked
self.state = 1
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()

View File

@@ -0,0 +1,13 @@
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
),
opt=3,
)

View File

@@ -0,0 +1,158 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
class Stream:
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b""
def get_extra_info(self, v):
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
# TODO yield?
self.s.close()
async def read(self, n):
yield core._io_queue.queue_read(self.s)
return self.s.read(n)
async def readexactly(self, n):
r = b""
while n:
yield core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if not len(r2):
raise EOFError
r += r2
n -= len(r2)
return r
async def readline(self):
l = b""
while True:
yield core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
self.out_buf += buf
async def drain(self):
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
yield core._io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b""
# Stream can be used for both reading and writing to save code size
StreamReader = Stream
StreamWriter = Stream
# Create a TCP stream connection to a remote host
async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
yield core._io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
self.task.cancel()
async def wait_closed(self):
await self.task
async def _serve(self, cb, host, port, backlog):
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
self.task = core.cur_task
# Accept incoming connections
while True:
try:
yield core._io_queue.queue_read(s)
except core.CancelledError:
# Shutdown server
s.close()
return
try:
s2, addr = s.accept()
except:
# Ignore a failed accept
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
s = Server()
core.create_task(s._serve(cb, host, port, backlog))
return s
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off : off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?

184
tests/libs/uasyncio/task.py Normal file
View File

@@ -0,0 +1,184 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
from . import core
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push_sorted(self, v, key):
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
self.heap = ph_meld(v, self.heap)
def push_head(self, v):
self.push_sorted(v, core.ticks())
def pop_head(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if self.coro is self:
# Signal that the completed-task has been await'ed on.
self.waiting = None
elif not hasattr(self, "waiting"):
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
self.waiting = TaskQueue()
return self
def __next__(self):
if self.coro is self:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.waiting.push_head(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
return self.coro is self
def cancel(self):
# Check if task is already finished.
if self.coro is self:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
self.data = core.CancelledError
return True
def throw(self, value):
# This task raised an exception which was uncaught; handle that now.
# Set the data because it was cleared by the main scheduling loop.
self.data = value
if not hasattr(self, "waiting"):
# Nothing await'ed on the task so call the exception handler.
core._exc_context["exception"] = value
core._exc_context["future"] = self
core.Loop.call_exception_handler(core._exc_context)

View File

@@ -21,6 +21,14 @@ class TestMicrodot(unittest.TestCase):
sys.modules['microdot'].socket = self.original_socket
sys.modules['microdot'].create_thread = self.original_create_thread
def _add_shutdown(self, app):
@app.route('/shutdown')
def shutdown(req):
app.shutdown()
return ''
mock_socket.add_request('GET', '/shutdown')
def test_get_request(self):
app = Microdot()
@@ -30,7 +38,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -49,12 +58,56 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('POST', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
def test_empty_request(self):
app = Microdot()
mock_socket.clear_requests()
fd = mock_socket.FakeStream(b'\n')
mock_socket._requests.append(fd)
self._add_shutdown(app)
app.run()
assert fd.response == b''
def test_method_decorators(self):
app = Microdot()
@app.get('/get')
def get(req):
return 'GET'
@app.post('/post')
def post(req):
return 'POST'
@app.put('/put')
def put(req):
return 'PUT'
@app.patch('/patch')
def patch(req):
return 'PATCH'
@app.delete('/delete')
def delete(req):
return 'DELETE'
methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
mock_socket.clear_requests()
fds = [mock_socket.add_request(method, '/' + method.lower())
for method in methods]
self._add_shutdown(app)
app.run()
for fd, method in zip(fds, methods):
self.assertTrue(fd.response.endswith(
b'\r\n\r\n' + method.encode()))
def test_before_after_request(self):
app = Microdot()
@@ -83,7 +136,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/bar')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
@@ -93,7 +147,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/baz')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
@@ -110,7 +165,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -129,12 +185,49 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_413(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 413 N/A\r\n'))
self.assertIn(b'Content-Length: 17\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nPayload too large'))
def test_413_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(413)
def handle_413(req):
return '413', 400
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n413'))
def test_500(self):
app = Microdot()
@@ -144,7 +237,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
self.assertIn(b'Content-Length: 21\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -163,7 +257,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -182,7 +277,8 @@ class TestMicrodot(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)

View File

@@ -0,0 +1,31 @@
import unittest
from microdot import MultiDict
class TestMultiDict(unittest.TestCase):
def test_multidict(self):
d = MultiDict()
assert dict(d) == {}
assert d.get('zero') is None
assert d.get('zero', default=0) == 0
assert d.getlist('zero') == []
assert d.getlist('zero', type=int) == []
d['one'] = 1
assert d['one'] == 1
assert d.get('one') == 1
assert d.get('one', default=2) == 1
assert d.get('one', type=int) == 1
assert d.get('one', type=str) == '1'
d['two'] = 1
d['two'] = 2
assert d['two'] == 1
assert d.get('two') == 1
assert d.get('two', default=2) == 1
assert d.get('two', type=int) == 1
assert d.get('two', type=str) == '1'
assert d.getlist('two') == [1, 2]
assert d.getlist('two', type=int) == [1, 2]
assert d.getlist('two', type=str) == ['1', '2']

View File

@@ -1,12 +1,13 @@
import unittest
from microdot import Request
from microdot import Request, MultiDict
from tests.mock_socket import get_request_fd
class TestRequest(unittest.TestCase):
def test_create_request(self):
fd = get_request_fd('GET', '/foo')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.app, 'app')
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
@@ -26,7 +27,7 @@ class TestRequest(unittest.TestCase):
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
@@ -39,39 +40,53 @@ class TestRequest(unittest.TestCase):
def test_args(self):
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_json(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
json = req.json
self.assertEqual(json, {'foo': 'bar'})
self.assertTrue(req.json is json)
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.json, [1, '2'])
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertIsNone(req.json)
def test_form(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(form, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
self.assertTrue(req.form is form)
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = Request.create(fd, 'addr')
req = Request.create('app', fd, 'addr')
self.assertIsNone(req.form)
def test_large_payload(self):
saved_max_content_length = Request.max_content_length
Request.max_content_length = 16
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=y')
req = Request.create('app', fd, 'addr')
assert req.body == b''
Request.max_content_length = saved_max_content_length

View File

@@ -112,6 +112,28 @@ class TestResponse(unittest.TestCase):
self.assertEqual(res.headers, {'X-Test': 'Foo'})
self.assertEqual(res.body, b'foo')
def test_create_with_reason(self):
res = Response('foo', reason='ALL GOOD!')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'ALL GOOD!')
self.assertEqual(res.body, b'foo')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 ALL GOOD!\r\n', response)
def test_create_with_status_and_reason(self):
res = Response('not found', 404, reason='NOT FOUND')
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'NOT FOUND')
self.assertEqual(res.body, b'not found')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 404 NOT FOUND\r\n', response)
def test_cookies(self):
res = Response('ok')
res.set_cookie('foo1', 'bar1')
@@ -161,9 +183,34 @@ class TestResponse(unittest.TestCase):
res = Response.send_file('tests/files/' + file)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], content_type)
self.assertEqual(res.body.read(), b'foo\n')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(response, (
b'HTTP/1.0 200 OK\r\nContent-Type: ' + content_type.encode()
+ b'\r\n\r\nfoo\n'))
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
self.assertEqual(res.body.read(), b'foo\n')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(
response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
def test_send_file_small_buffer(self):
original_buffer_size = Response.send_file_buffer_size
Response.send_file_buffer_size = 2
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(
response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
Response.send_file_buffer_size = original_buffer_size

View File

@@ -14,6 +14,14 @@ class TestMicrodotAsync(unittest.TestCase):
# restore original socket module
sys.modules['microdot_asyncio'].asyncio = self.original_asyncio
def _add_shutdown(self, app):
@app.route('/shutdown')
def shutdown(req):
app.shutdown()
return ''
mock_socket.add_request('GET', '/shutdown')
def test_get_request(self):
app = Microdot()
@@ -28,7 +36,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
fd2 = mock_socket.add_request('GET', '/async')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -56,7 +65,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('POST', '/')
fd2 = mock_socket.add_request('POST', '/async')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -66,6 +76,16 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
self.assertTrue(fd2.response.endswith(b'\r\n\r\nbar-async'))
def test_empty_request(self):
app = Microdot()
mock_socket.clear_requests()
fd = mock_socket.FakeStream(b'\n')
mock_socket._requests.append(fd)
self._add_shutdown(app)
app.run()
assert fd.response == b''
def test_before_after_request(self):
app = Microdot()
@@ -94,7 +114,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/bar')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
@@ -104,7 +125,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/baz')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
@@ -121,7 +143,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -140,12 +163,49 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_413(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 413 N/A\r\n'))
self.assertIn(b'Content-Length: 17\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nPayload too large'))
def test_413_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(413)
async def handle_413(req):
return '413', 400
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo', body='x' * 17000)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 400 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n413'))
def test_500(self):
app = Microdot()
@@ -155,7 +215,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
self.assertIn(b'Content-Length: 21\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -174,7 +235,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
@@ -193,7 +255,8 @@ class TestMicrodotAsync(unittest.TestCase):
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self.assertRaises(IndexError, app.run)
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)

View File

@@ -4,6 +4,7 @@ except ImportError:
import asyncio
import unittest
from microdot import MultiDict
from microdot_asyncio import Request
from tests.mock_socket import get_async_request_fd
@@ -15,7 +16,8 @@ def _run(coro):
class TestRequestAsync(unittest.TestCase):
def test_create_request(self):
fd = get_async_request_fd('GET', '/foo')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.app, 'app')
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
@@ -35,7 +37,7 @@ class TestRequestAsync(unittest.TestCase):
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
@@ -48,39 +50,53 @@ class TestRequestAsync(unittest.TestCase):
def test_args(self):
fd = get_async_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_json(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
json = req.json
self.assertEqual(json, {'foo': 'bar'})
self.assertTrue(req.json is json)
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.json, [1, '2'])
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertIsNone(req.json)
def test_form(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertEqual(form, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
self.assertTrue(req.form is form)
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create(fd, 'addr'))
req = _run(Request.create('app', fd, 'addr'))
self.assertIsNone(req.form)
def test_large_payload(self):
saved_max_content_length = Request.max_content_length
Request.max_content_length = 16
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=y')
req = _run(Request.create('app', fd, 'addr'))
assert req.body == b''
Request.max_content_length = saved_max_content_length

View File

@@ -84,3 +84,48 @@ class TestResponseAsync(unittest.TestCase):
self.assertIn(b'Content-Length: 8\r\n', fd.response)
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n[1, "2"]'))
def test_create_with_reason(self):
res = Response('foo', reason='ALL GOOD!')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'ALL GOOD!')
self.assertEqual(res.body, b'foo')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 ALL GOOD!\r\n', fd.response)
def test_create_with_status_and_reason(self):
res = Response('not found', 404, reason='NOT FOUND')
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers, {})
self.assertEqual(res.reason, 'NOT FOUND')
self.assertEqual(res.body, b'not found')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 404 NOT FOUND\r\n', fd.response)
def test_send_file(self):
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertEqual(
fd.response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
def test_send_file_small_buffer(self):
original_buffer_size = Response.send_file_buffer_size
Response.send_file_buffer_size = 2
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertEqual(
fd.response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
Response.send_file_buffer_size = original_buffer_size

View File

@@ -5,38 +5,35 @@ except ImportError:
from tests import mock_socket
_calls = []
class EventLoop:
def run_until_complete(self, coro):
_calls.append(('run_until_complete', coro))
self.coro = coro
def run_forever(self):
_calls.append(('run_forever',))
async def rf():
s = mock_socket.socket()
while True:
fd, addr = s.accept()
fd = mock_socket.FakeStreamAsync(fd)
await self.coro(fd, fd)
asyncio.get_event_loop().run_until_complete(rf())
def close(self):
pass
loop = EventLoop()
def get_event_loop():
_calls.append(('get_event_loop',))
return loop
return asyncio.get_event_loop()
def start_server(cb, host, port):
_calls.append(('start_server', cb, host, port))
return cb
async def start_server(cb, host, port):
class MockServer:
def __init__(self):
self.closed = False
async def run(self):
s = mock_socket.socket()
while not self.closed:
fd, addr = s.accept()
fd = mock_socket.FakeStreamAsync(fd)
await cb(fd, fd)
def close(self):
self.closed = True
async def wait_closed(self):
while not self.closed:
await asyncio.sleep(0.01)
server = MockServer()
asyncio.get_event_loop().create_task(server.run())
return server
def run(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)

View File

@@ -6,12 +6,10 @@ except ImportError:
SOL_SOCKET = 'SOL_SOCKET'
SO_REUSEADDR = 'SO_REUSEADDR'
_calls = []
_requests = []
def getaddrinfo(host, port):
_calls.append(('getaddrinfo', host, port))
return (('family', 'addr'), 'socktype', 'proto', 'canonname', 'sockaddr')
@@ -20,19 +18,21 @@ class socket:
self.request_index = 0
def setsockopt(self, level, optname, value):
_calls.append(('setsockopt', level, optname, value))
pass
def bind(self, addr):
_calls.append(('bind', addr))
pass
def listen(self, backlog):
_calls.append(('listen', backlog))
pass
def accept(self):
_calls.append(('accept',))
self.request_index += 1
return _requests[self.request_index - 1], 'addr'
def close(self):
pass
class FakeStream(io.BytesIO):
def __init__(self, input_data):

25
tools/Dockerfile Normal file
View File

@@ -0,0 +1,25 @@
FROM ubuntu:latest
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python python3 && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/micropython/micropython.git && \
cd micropython && \
git checkout v1.15 && \
git submodule update --init && \
cd mpy-cross && \
make && \
cd .. && \
cd ports/unix && \
make axtls && \
make && \
make test && \
make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python python3 && \
cd ../../.. && \
rm -rf micropython
CMD ["/usr/local/bin/micropython"]

6
tools/update-micropython.sh Executable file
View File

@@ -0,0 +1,6 @@
# this script updates the micropython binary in the /bin directory that is
# used to run unit tests under GitHub Actions builds
docker build -t micropython .
docker create -it --name dummy-micropython micropython
docker cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
docker rm dummy-micropython

12
tox.ini
View File

@@ -13,17 +13,17 @@ python =
[testenv]
commands=
pip install -e microdot
pip install -e microdot-asyncio
coverage run --branch --include="microdot*.py" -m unittest tests
coverage report --show-missing
deps=coverage
pip install -e .
pytest -p no:logging --cov=src --cov-branch --cov-report=term-missing
deps=
pytest
pytest-cov
[testenv:flake8]
deps=
flake8
commands=
flake8 --exclude tests/libs microdot microdot-asyncio tests
flake8 --ignore=W503 --exclude tests/libs src tests
[testenv:upy]
whitelist_externals=sh