38 Commits

Author SHA1 Message Date
Miguel Grinberg
4eea7adb8f Release v0.4.0 2021-06-04 17:16:18 +01:00
Miguel Grinberg
a3288a63ed Add method specific route decorators 2021-06-04 17:14:38 +01:00
Miguel Grinberg
3bd7fe8cea Update microypthon binary to 1.15 2021-06-04 16:57:51 +01:00
Miguel Grinberg
0ad538df91 Server shutdown (Fixes #19) 2021-06-04 16:01:07 +01:00
Miguel Grinberg
b810346aa4 Release v0.3.1 2021-02-06 12:12:08 +00:00
Miguel Grinberg
ae5d330b2d fix release script 2021-02-06 12:10:37 +00:00
Miguel Grinberg
4c0afa2bec switch to GitHub actions for builds 2021-02-06 12:01:11 +00:00
Ricardo Mendonça Ferreira
125af4b4a9 Handle Chrome preconnect (Fixes #8) 2021-02-06 00:04:21 +00:00
Damien George
c5e1873523 Move socket import, remove Request.G, and add simple hello example (#12)
* Further guard import of socket to make it optional

This is so that systems without a (u)socket module can still use Microdot.
For example if the transport layer is provided by a serial link.

* Add simple hello.py example that serves a static HTML page
2020-06-30 23:23:17 +01:00
Miguel Grinberg
dfbe2edd79 Update python versions to build 2020-02-19 00:08:07 +00:00
Miguel Grinberg
3e29af5775 Support large downloads in send_file (fixes #3) 2020-02-19 00:08:07 +00:00
Miguel Grinberg
1aacb3cf46 readme update 2019-06-09 17:47:20 +01:00
Miguel Grinberg
64cc172917 Release v0.3.0 2019-05-05 20:32:55 +00:00
Miguel Grinberg
b9ca036e1d release script 2019-05-05 20:32:39 +00:00
Miguel Grinberg
b06b6de584 project restructure 2019-05-05 17:29:03 +00:00
Miguel Grinberg
e5525c5c48 rename microdot_async to microdot_asyncio 2019-05-05 16:16:57 +00:00
Miguel Grinberg
494800ff9f threaded mode 2019-05-05 03:55:18 +00:00
Miguel Grinberg
ba986a89ff more asyncio unit tests 2019-05-05 03:13:22 +00:00
Miguel Grinberg
89f7f09b9a async request and response unit tests 2019-05-04 22:49:57 +00:00
Miguel Grinberg
3d9b5d7084 optional asyncio support 2019-05-04 21:07:42 +00:00
Miguel Grinberg
03efe46a26 more robust header checking in tests 2019-05-04 20:31:59 +00:00
Miguel Grinberg
b16466f1a9 improve code structure 2019-05-04 20:17:04 +00:00
Miguel Grinberg
8aa50f171d g, before_request and after_request 2019-04-27 18:23:44 +01:00
Miguel Grinberg
76ab1fa6d7 more unit tests 2019-04-27 16:35:55 +01:00
Miguel Grinberg
0a373775d5 url pattern matching unit tests 2019-04-27 15:20:22 +01:00
Miguel Grinberg
cd71986a50 response unit tests 2019-04-27 14:23:07 +01:00
Miguel Grinberg
0b95feafc9 request unit tests 2019-04-27 12:03:52 +01:00
Miguel Grinberg
9b32292f21 travis badge 2019-04-27 10:31:41 +01:00
Miguel Grinberg
f741ed7cf8 unit testing framework 2019-04-27 10:27:45 +01:00
Miguel Grinberg
92edc17522 flake8 2019-04-27 09:37:53 +01:00
Miguel Grinberg
4c83cb7563 debug mode 2019-04-20 11:15:13 +01:00
Miguel Grinberg
491202de1f print exceptions 2019-04-20 10:09:37 +01:00
Miguel Grinberg
db29624a45 Release 0.2.0 2019-04-19 20:22:18 +01:00
Miguel Grinberg
0f2c749f6d error handlers 2019-04-19 20:21:51 +01:00
Miguel Grinberg
52f2d0c491 fleshed out example GPIO application 2019-04-19 19:35:00 +01:00
Miguel Grinberg
2f58c41cc8 more robust parsing of cookie header 2019-04-19 19:32:47 +01:00
Miguel Grinberg
4fbed286e4 Release 0.1.1 2019-04-17 07:50:11 +01:00
Miguel Grinberg
e4ff70cf8f minor fixes for micropython 2019-04-16 12:14:22 +01:00
53 changed files with 5501 additions and 304 deletions

3
.flake8 Normal file
View File

@@ -0,0 +1,3 @@
[flake8]
select = C,E,F,W,B,B950
per-file-ignores = ./*/__init__.py:F401

1
.gitattributes vendored Normal file
View File

@@ -0,0 +1 @@
tests/files/* binary

3
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,3 @@
github: miguelgrinberg
patreon: miguelgrinberg
custom: https://paypal.me/miguelgrinberg

53
.github/workflows/tests.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: build
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
lint:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -eflake8
tests:
name: tests
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python: ['3.6', '3.7', '3.8', '3.9']
fail-fast: false
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox
tests-micropython:
name: tests-micropython
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions
- run: tox -eupy
coverage:
name: coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: python -m pip install --upgrade pip wheel
- run: pip install tox tox-gh-actions codecov
- run: tox
- run: codecov

View File

@@ -1,2 +1,8 @@
# microdot
[![Build status](https://github.com/miguelgrinberg/microdot/workflows/build/badge.svg)](https://github.com/miguelgrinberg/microdot/actions) [![codecov](https://codecov.io/gh/miguelgrinberg/microdot/branch/master/graph/badge.svg)](https://codecov.io/gh/miguelgrinberg/microdot)
A minimalistic Python web framework for microcontrollers inspired by Flask
## Documentation
Coming soon!

BIN
bin/micropython Executable file

Binary file not shown.

33
bin/mkrelease Executable file
View File

@@ -0,0 +1,33 @@
#!/bin/bash
VERSION=$1
if [[ "$VERSION" == "" ]]; then
echo Usage: $0 "<version>"
exit 1
fi
git diff --cached --exit-code >/dev/null
if [[ "$?" != "0" ]]; then
echo Commit your changes before using this script.
exit 1
fi
set -e
for PKG in microdot*; do
echo Building $PKG...
cd $PKG
sed -i "" "s/version.*$/version=\"$VERSION\",/" setup.py
git add setup.py
rm -rf dist
python setup.py sdist bdist_wheel --universal
cd ..
done
git commit -m "Release v$VERSION"
git tag v$VERSION
git push --tags origin master
for PKG in microdot*; do
echo Releasing $PKG...
cd $PKG
twine upload dist/*
cd ..
done

View File

@@ -2,14 +2,75 @@
<html>
<head>
<title>Microdot GPIO Example</title>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script>
function getCookie(name) {
var value = "; " + document.cookie;
var parts = value.split("; " + name + "=");
if (parts.length == 2)
return parts.pop().split(";").shift();
}
function showMessage() {
document.getElementById('message').innerHTML = getCookie('message');
}
function onLoad() {
showMessage();
var form = getCookie('form');
if (form) {
form = form.split(',')
document.getElementById('pin').selectedIndex = parseInt(form[0]);
document.getElementById(form[1]).checked = true;
}
}
</script>
</head>
<body>
<h1>Microdot GPIO Example</h1>
<form method="POST" action="">
<p>GPIO Pin: <input type="text" name="pin" size="3"></p>
<input type="submit" name="read" value="Read">
<input type="submit" name="set-low" value="Set Low">
<input type="submit" name="set-high" value="Set high">
</form>
<body onload="onLoad();">
<div class="container">
<h1>Microdot GPIO Example</h1>
<div class="alert alert-primary" role="alert" id="message">
</div>
<form method="POST" action="">
<p>
GPIO Pin:
<select name="pin" id="pin">
<option>0</option>
<option>1</option>
<option>2</option>
<option>3</option>
<option>4</option>
<option>5</option>
<option>6</option>
<option>7</option>
<option>8</option>
<option>9</option>
<option>10</option>
<option>11</option>
<option>12</option>
<option>13</option>
<option>14</option>
<option>15</option>
<option>16</option>
</select>
</p>
<div>
<p>
<input type="radio" name="pull" value="pullup" id="pullup">
<label for="pullup">Pull-Up</label>&nbsp;&nbsp;
<input type="radio" name="pull" value="pulldown" id="pulldown">
<label for="pulldown">Pull-Down</label>&nbsp;&nbsp;
<input type="radio" name="pull" value="pullnone" id="pullnone" checked>
<label for="pullnone">None</label>
<br>
<input type="submit" class="btn btn-outline-dark" name="read" value="Read">
</p>
</div>
<div>
<p>
<input type="submit" class="btn btn-outline-dark" name="set-low" value="Set Low">
<input type="submit" class="btn btn-outline-dark" name="set-high" value="Set high">
</p>
</div>
</form>
</div>
</body>
</html>

View File

@@ -6,14 +6,38 @@ app = Microdot()
@app.route('/', methods=['GET', 'POST'])
def index(request):
form_cookie = None
message_cookie = None
if request.method == 'POST':
if 'set-read' in request.form:
pin = machine.Pin(int(request.form['pin']), machine.Pin.IN)
form_cookie = '{pin},{pull}'.format(pin=request.form['pin'],
pull=request.form['pull'])
if 'read' in request.form:
pull = None
if request.form['pull'] == 'pullup':
pull = machine.Pin.PULL_UP
elif request.form['pull'] == 'pulldown':
pull = machine.Pin.PULL_DOWN
pin = machine.Pin(int(request.form['pin']), machine.Pin.IN, pull)
message_cookie = 'Input pin {pin} is {state}.'.format(
pin=request.form['pin'],
state='high' if pin.value() else 'low')
else:
pin = machine.Pin(int(request.form['pin']), machine.Pin.OUT)
pin.value(0 if 'set-low' in request.form else 1)
return redirect('/')
return send_file('gpio.html')
value = 0 if 'set-low' in request.form else 1
pin.value(value)
message_cookie = 'Output pin {pin} is now {state}.'.format(
pin=request.form['pin'],
state='high' if value else 'low')
response = redirect('/')
else:
if 'message' not in request.cookies:
message_cookie = 'Select a pin and an operation below.'
response = send_file('gpio.html')
if form_cookie:
response.set_cookie('form', form_cookie)
if message_cookie:
response.set_cookie('message', message_cookie)
return response
app.run()
app.run(debug=True)

32
examples/hello.py Normal file
View File

@@ -0,0 +1,32 @@
from microdot import Microdot, Response
app = Microdot()
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
</head>
<body>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
'''
@app.route('/')
def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
@app.route('/shutdown')
def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
app.run(debug=True)

40
examples/hello_async.py Normal file
View File

@@ -0,0 +1,40 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot_asyncio import Microdot, Response
app = Microdot()
htmldoc = '''<!DOCTYPE html>
<html>
<head>
<title>Microdot Example Page</title>
</head>
<body>
<div>
<h1>Microdot Example Page</h1>
<p>Hello from Microdot!</p>
<p><a href="/shutdown">Click to shutdown the server</a></p>
</div>
</body>
</html>
'''
@app.route('/')
async def hello(request):
return Response(body=htmldoc, headers={'Content-Type': 'text/html'})
@app.route('/shutdown')
async def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
async def main():
await app.start_server(debug=True)
asyncio.run(main())

View File

@@ -0,0 +1,173 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
def _iscoroutine(coro):
return hasattr(coro, 'send') and hasattr(coro, 'throw')
class Request(BaseRequest):
@staticmethod
async def create(app, stream, client_addr):
# request line
line = (await stream.readline()).strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = (await stream.readline()).strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = await stream.read(content_length) \
if content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
class Response(BaseResponse):
async def write(self, stream):
self.complete()
# status code
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
await stream.awrite('{header}: {value}\r\n'.format(
header=header, value=value).encode())
await stream.awrite(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
await stream.awrite(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'):
self.body.close()
else:
await stream.awrite(self.body)
class Microdot(BaseMicrodot):
async def start_server(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
async def serve(reader, writer):
if not hasattr(writer, 'awrite'): # pragma: no cover
# CPython provides the awrite and aclose methods in 3.8+
async def awrite(self, data):
self.write(data)
await self.drain()
async def aclose(self):
self.close()
await self.wait_closed()
from types import MethodType
writer.awrite = MethodType(awrite, writer)
writer.aclose = MethodType(aclose, writer)
await self.dispatch_request(reader, writer)
if self.debug: # pragma: no cover
print('Starting async server on {host}:{port}...'.format(
host=host, port=port))
self.server = await asyncio.start_server(serve, host, port)
await self.server.wait_closed()
def run(self, host='0.0.0.0', port=5000, debug=False):
asyncio.run(self.start_server(host=host, port=port, debug=debug))
def shutdown(self):
self.server.close()
async def dispatch_request(self, reader, writer):
req = await Request.create(self, reader,
writer.get_extra_info('peername'))
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = await self._invoke_handler(handler, req)
if res:
break
if res is None:
res = await self._invoke_handler(
f, req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
elif 404 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[404], req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
await res.write(writer)
await writer.aclose()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
ret = f_or_coro(*args, **kwargs)
if _iscoroutine(ret):
ret = await ret
return ret
redirect = Response.redirect
send_file = Response.send_file

35
microdot-asyncio/setup.py Executable file
View File

@@ -0,0 +1,35 @@
"""
Microdot-AsyncIO
----------------
AsyncIO support for the Microdot web framework.
"""
from setuptools import setup
setup(
name='microdot-asyncio',
version="0.4.0",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='AsyncIO support for the Microdot web framework',
long_description=__doc__,
py_modules=['microdot_asyncio'],
platforms='any',
install_requires=[
'microdot',
'micropython-uasyncio;implementation_name=="micropython"'
],
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: Implementation :: MicroPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View File

@@ -1,281 +0,0 @@
import json
import re
try:
import usocket as socket
except ImportError:
import socket
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class Request():
def __init__(self, client_sock, client_addr):
self.client_sock = client_sock
self.client_addr = client_addr
if not hasattr(client_sock, 'readline'):
self.client_stream = client_sock.makefile("rwb")
else:
self.client_stream = client_sock
# request line
line = self.client_stream.readline().strip().decode('utf-8')
self.method, self.path, self.http_version = line.split()
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
else:
self.query_string = None
# headers
self.headers = {}
self.cookies = {}
self.content_length = 0
while True:
line = self.client_stream.readline().strip().decode('utf-8')
if line == '':
break
header, value = line.split(':', 1)
header = header.title()
value = value.strip()
self.headers[header] = value
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in self.headers['Cookie'].split(';'):
name, value = cookie.split('=', 1)
self.cookies[name] = value
# body
self.body = self.client_stream.read(self.content_length)
self._json = None
self._form = None
@property
def json(self):
if self.content_type != 'application/json':
return None
if self._json is None:
self._json = json.loads(self.body)
return self._json
@property
def form(self):
if self.content_type != 'application/x-www-form-urlencoded':
return None
if self._form is None:
self._form = {urldecode(key): urldecode(value) for key, value in
[pair.split('=', 1) for pair in self.body.decode().split('&')]}
return self._form
def close(self):
self.client_stream.close()
if self.client_stream != self.client_sock:
self.client_sock.close()
class Response():
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
def __init__(self, body='', status_code=200, headers=None):
self.status_code = status_code
self.headers = headers or {}
self.cookies = []
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
elif isinstance(body, bytes):
self.body = body
else:
self.body = str(body).encode()
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; httpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def write(self, client_stream):
# status code
client_stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
content_length_found = False
content_type_found = False
for header, value in self.headers.items():
header = header.title()
values = value if isinstance(value, list) else [value]
for value in values:
client_stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
if header == 'Content-Length':
content_length_found = True
elif header == 'Content-Type':
content_type_found = True
if not content_length_found:
client_stream.write('Content-Length: {length}\r\n'.format(
length=len(self.body)).encode())
if not content_type_found:
client_stream.write(b'Content-Type: text/plain\r\n')
client_stream.write(b'\r\n')
# body
if self.body:
client_stream.write(self.body)
@staticmethod
def redirect(location, status_code=302):
return Response(status_code=status_code,
headers={'Location': location})
@staticmethod
def send_file(filename, status_code=200, content_type=None):
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
with open(filename) as f:
return Response(body=f.read(), status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.split(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]*'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.*'
elif type_.startswith('regex'):
pattern = eval(type_[5:])
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile(self.pattern)
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
def __init__(self) :
self.url_map = []
def route(self, url_pattern, methods=None):
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def run(self, host='0.0.0.0', port=5000):
s = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
print('Listening on {host}:{port}...'.format(host=host, port=port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
while True:
req = Request(*s.accept())
f = None
args = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
args = route_pattern.match(req.path)
if args is not None:
f = route_handler
break
if f:
resp = f(req, **args)
if isinstance(resp, tuple):
resp = Response(*resp)
elif not isinstance(resp, Response):
resp = Response(resp)
resp.write(req.client_stream)
req.close()
redirect = Response.redirect
send_file = Response.send_file

452
microdot/microdot.py Normal file
View File

@@ -0,0 +1,452 @@
try:
from sys import print_exception
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
traceback.print_exc()
try:
import uerrno as errno
except ImportError:
import errno
concurrency_mode = 'threaded'
try: # pragma: no cover
import threading
def create_thread(f, *args, **kwargs):
"""Use the threading module."""
threading.Thread(target=f, args=args, kwargs=kwargs).start()
except ImportError: # pragma: no cover
try:
import _thread
def create_thread(f, *args, **kwargs):
"""Use MicroPython's _thread module."""
def run():
f(*args, **kwargs)
_thread.start_new_thread(run, ())
except ImportError:
def create_thread(f, *args, **kwargs):
"""No threads available, call function synchronously."""
f(*args, **kwargs)
concurrency_mode = 'sync'
try:
import ujson as json
except ImportError:
import json
try:
import ure as re
except ImportError:
import re
try:
import usocket as socket
except ImportError:
try:
import socket
except ImportError: # pragma: no cover
socket = None
def urldecode(string):
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) == 1:
return string
result = [parts[0]]
for item in parts[1:]:
if item == '':
result.append('%')
else:
code = item[:2]
result.append(chr(int(code, 16)))
result.append(item[2:])
return ''.join(result)
class Request():
class G:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body):
self.app = app
self.client_addr = client_addr
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
for header, value in self.headers.items():
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self.body = body
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(app, client_stream, client_addr):
# request line
line = client_stream.readline().strip().decode()
if not line: # pragma: no cover
return None
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = client_stream.read(content_length) if content_length else b''
return Request(app, client_addr, method, url, http_version, headers,
body)
def _parse_urlencoded(self, urlencoded):
return {
urldecode(key): urldecode(value) for key, value in [
pair.split('=', 1) for pair in
urlencoded.split('&')]}
@property
def json(self):
if self.content_type != 'application/json':
return None
if self._json is None:
self._json = json.loads(self.body.decode())
return self._json
@property
def form(self):
if self.content_type != 'application/x-www-form-urlencoded':
return None
if self._form is None:
self._form = self._parse_urlencoded(self.body.decode())
return self._form
class Response():
types_map = {
'css': 'text/css',
'gif': 'image/gif',
'html': 'text/html',
'jpg': 'image/jpeg',
'js': 'application/javascript',
'json': 'application/json',
'png': 'image/png',
'txt': 'text/plain',
}
send_file_buffer_size = 1024
def __init__(self, body='', status_code=200, headers=None):
self.status_code = status_code
self.headers = headers or {}
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
self.headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
self.body = body.encode()
else:
# this applies to bytes or file-like objects
self.body = body
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
if path:
http_cookie += '; Path=' + path
if domain:
http_cookie += '; Domain=' + domain
if expires:
http_cookie += '; Expires=' + expires.strftime(
"%a, %d %b %Y %H:%M:%S GMT")
if max_age:
http_cookie += '; Max-Age=' + str(max_age)
if secure:
http_cookie += '; Secure'
if http_only:
http_cookie += '; HttpOnly'
if 'Set-Cookie' in self.headers:
self.headers['Set-Cookie'].append(http_cookie)
else:
self.headers['Set-Cookie'] = [http_cookie]
def complete(self):
if isinstance(self.body, bytes) and \
'Content-Length' not in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
stream.write(b'\r\n')
# body
if self.body:
if hasattr(self.body, 'read'):
while True:
buf = self.body.read(self.send_file_buffer_size)
if len(buf):
stream.write(buf)
if len(buf) < self.send_file_buffer_size:
break
if hasattr(self.body, 'close'): # pragma: no close
self.body.close()
else:
stream.write(self.body)
@classmethod
def redirect(cls, location, status_code=302):
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
class URLPattern():
def __init__(self, url_pattern):
self.pattern = ''
self.args = []
use_regex = False
for segment in url_pattern.lstrip('/').split('/'):
if segment and segment[0] == '<':
if segment[-1] != '>':
raise ValueError('invalid URL pattern')
segment = segment[1:-1]
if ':' in segment:
type_, name = segment.rsplit(':', 1)
else:
type_ = 'string'
name = segment
if type_ == 'string':
pattern = '[^/]+'
elif type_ == 'int':
pattern = '\\d+'
elif type_ == 'path':
pattern = '.+'
elif type_.startswith('re:'):
pattern = type_[3:]
else:
raise ValueError('invalid URL segment type')
use_regex = True
self.pattern += '/({pattern})'.format(pattern=pattern)
self.args.append({'type': type_, 'name': name})
else:
self.pattern += '/{segment}'.format(segment=segment)
if use_regex:
self.pattern = re.compile('^' + self.pattern + '$')
def match(self, path):
if isinstance(self.pattern, str):
if path != self.pattern:
return
return {}
g = self.pattern.match(path)
if not g:
return
args = {}
i = 1
for arg in self.args:
value = g.group(i)
if arg['type'] == 'int':
value = int(value)
args[arg['name']] = value
i += 1
return args
class Microdot():
def __init__(self):
self.url_map = []
self.before_request_handlers = []
self.after_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.debug = False
self.server = None
def route(self, url_pattern, methods=None):
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
return f
return decorated
def get(self, url_pattern):
return self.route(url_pattern, methods=['GET'])
def post(self, url_pattern):
return self.route(url_pattern, methods=['POST'])
def put(self, url_pattern):
return self.route(url_pattern, methods=['PUT'])
def patch(self, url_pattern):
return self.route(url_pattern, methods=['PATCH'])
def delete(self, url_pattern):
return self.route(url_pattern, methods=['DELETE'])
def before_request(self, f):
self.before_request_handlers.append(f)
return f
def after_request(self, f):
self.after_request_handlers.append(f)
return f
def errorhandler(self, status_code_or_exception_class):
def decorated(f):
self.error_handlers[status_code_or_exception_class] = f
return f
return decorated
def run(self, host='0.0.0.0', port=5000, debug=False):
self.debug = debug
self.shutdown_requested = False
self.server = socket.socket()
ai = socket.getaddrinfo(host, port)
addr = ai[0][-1]
if self.debug: # pragma: no cover
print('Starting {mode} server on {host}:{port}...'.format(
mode=concurrency_mode, host=host, port=port))
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(addr)
self.server.listen(5)
while not self.shutdown_requested:
try:
sock, addr = self.server.accept()
except OSError as exc:
if exc.args[0] == errno.ECONNABORTED:
break
else:
raise
create_thread(self.dispatch_request, sock, addr)
def shutdown(self):
self.shutdown_requested = True
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, sock, addr):
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = Request.create(self, stream, addr)
if req:
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
if self.shutdown_requested: # pragma: no cover
self.server.close()
if self.debug and req: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
redirect = Response.redirect
send_file = Response.send_file

View File

@@ -2,26 +2,21 @@
Microdot
--------
Impossibly small web framework for MicroPython.
The impossibly small web framework for MicroPython.
"""
from setuptools import setup
setup(
name='microdot',
version='0.1.0',
version="0.4.0",
url='http://github.com/miguelgrinberg/microdot/',
license='MIT',
author='Miguel Grinberg',
author_email='miguel.grinberg@gmail.com',
description='Impossibly small web framework for MicroPython',
description='The impossibly small web framework for MicroPython',
long_description=__doc__,
packages=['microdot'],
zip_safe=False,
include_package_data=True,
py_modules=['microdot'],
platforms='any',
tests_require=[
'coverage'
],
classifiers=[
'Environment :: Web Environment',
'Intended Audience :: Developers',

9
run_tests.py Normal file
View File

@@ -0,0 +1,9 @@
import sys
sys.path.insert(0, 'microdot')
sys.path.insert(1, 'microdot-asyncio')
sys.path.insert(2, 'tests/libs')
import unittest
unittest.main('tests')

8
tests/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from tests.microdot.test_request import TestRequest
from tests.microdot.test_response import TestResponse
from tests.microdot.test_url_pattern import TestURLPattern
from tests.microdot.test_microdot import TestMicrodot
from tests.microdot_asyncio.test_request_asyncio import TestRequestAsync
from tests.microdot_asyncio.test_response_asyncio import TestResponseAsync
from tests.microdot_asyncio.test_microdot_asyncio import TestMicrodotAsync

1
tests/files/test.bin Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.css Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.gif Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.html Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.jpg Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.js Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.json Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.png Normal file
View File

@@ -0,0 +1 @@
foo

1
tests/files/test.txt Normal file
View File

@@ -0,0 +1 @@
foo

2143
tests/libs/datetime.py Normal file

File diff suppressed because it is too large Load Diff

46
tests/libs/ffilib.py Normal file
View File

@@ -0,0 +1,46 @@
import sys
try:
import ffi
except ImportError:
ffi = None
_cache = {}
def open(name, maxver=10, extra=()):
if not ffi:
return None
try:
return _cache[name]
except KeyError:
pass
def libs():
if sys.platform == "linux":
yield '%s.so' % name
for i in range(maxver, -1, -1):
yield '%s.so.%u' % (name, i)
else:
for ext in ('dylib', 'dll'):
yield '%s.%s' % (name, ext)
for n in extra:
yield n
err = None
for n in libs():
try:
l = ffi.open(n)
_cache[name] = l
return l
except OSError as e:
err = e
raise err
def libc():
return open("libc", 6)
# Find out bitness of the platform, even if long ints are not supported
# TODO: All bitness differences should be removed from micropython-lib, and
# this snippet too.
bitness = 1
v = sys.maxsize
while v:
bitness += 1
v >>= 1

76
tests/libs/time.py Normal file
View File

@@ -0,0 +1,76 @@
from utime import *
from ucollections import namedtuple
import ustruct
import uctypes
import ffi
import ffilib
import array
libc = ffilib.libc()
# struct tm *gmtime(const time_t *timep);
# struct tm *localtime(const time_t *timep);
# size_t strftime(char *s, size_t max, const char *format,
# const struct tm *tm);
gmtime_ = libc.func("P", "gmtime", "P")
localtime_ = libc.func("P", "localtime", "P")
strftime_ = libc.func("i", "strftime", "sisP")
mktime_ = libc.func("i", "mktime", "P")
_struct_time = namedtuple("struct_time",
["tm_year", "tm_mon", "tm_mday", "tm_hour", "tm_min", "tm_sec", "tm_wday", "tm_yday", "tm_isdst"])
def _tuple_to_c_tm(t):
return ustruct.pack("@iiiiiiiii", t[5], t[4], t[3], t[2], t[1] - 1, t[0] - 1900, (t[6] + 1) % 7, t[7] - 1, t[8])
def _c_tm_to_tuple(tm):
t = ustruct.unpack("@iiiiiiiii", tm)
return _struct_time(t[5] + 1900, t[4] + 1, t[3], t[2], t[1], t[0], (t[6] - 1) % 7, t[7] + 1, t[8])
def struct_time(tm):
return _struct_time(*tm)
def strftime(format, t=None):
if t is None:
t = localtime()
buf = bytearray(32)
l = strftime_(buf, 32, format, _tuple_to_c_tm(t))
return str(buf[:l], "utf-8")
def localtime(t=None):
if t is None:
t = time()
t = int(t)
a = ustruct.pack('l', t)
tm_p = localtime_(a)
return _c_tm_to_tuple(uctypes.bytearray_at(tm_p, 36))
def gmtime(t=None):
if t is None:
t = time()
t = int(t)
a = ustruct.pack('l', t)
tm_p = gmtime_(a)
return _c_tm_to_tuple(uctypes.bytearray_at(tm_p, 36))
def mktime(tt):
return mktime_(_tuple_to_c_tm(tt))
def perf_counter():
return time()
def process_time():
return clock()
daylight = 0
timezone = 0

View File

@@ -0,0 +1,30 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
from .core import *
__version__ = (3, 0, 0)
_attrs = {
"wait_for": "funcs",
"wait_for_ms": "funcs",
"gather": "funcs",
"Event": "event",
"ThreadSafeFlag": "event",
"Lock": "lock",
"open_connection": "stream",
"start_server": "stream",
"StreamReader": "stream",
"StreamWriter": "stream",
}
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

281
tests/libs/uasyncio/core.py Normal file
View File

@@ -0,0 +1,281 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019 Damien P. George
from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
# Import TaskQueue and Task, preferring built-in C code over Python code
try:
from _uasyncio import TaskQueue, Task
except:
from .task import TaskQueue, Task
################################################################################
# Exceptions
class CancelledError(BaseException):
pass
class TimeoutError(Exception):
pass
# Used when calling Loop.call_exception_handler
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
################################################################################
# Sleep functions
# "Yield" once, then raise StopIteration
class SingletonGenerator:
def __init__(self):
self.state = None
self.exc = StopIteration()
def __iter__(self):
return self
def __next__(self):
if self.state is not None:
_task_queue.push_sorted(cur_task, self.state)
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
assert sgen.state is None
sgen.state = ticks_add(ticks(), max(0, t))
return sgen
# Pause task execution for the given time (in seconds)
def sleep(t):
return sleep_ms(int(t * 1000))
################################################################################
# Queue and poller for stream IO
class IOQueue:
def __init__(self):
self.poller = select.poll()
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
def _enqueue(self, s, idx):
if id(s) not in self.map:
entry = [None, None, s]
entry[idx] = cur_task
self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
else:
sm = self.map[id(s)]
assert sm[idx] is None
assert sm[1 - idx] is not None
sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT)
# Link task to this IOQueue so it can be removed if needed
cur_task.data = self
def _dequeue(self, s):
del self.map[id(s)]
self.poller.unregister(s)
def queue_read(self, s):
self._enqueue(s, 0)
def queue_write(self, s):
self._enqueue(s, 1)
def remove(self, task):
while True:
del_s = None
for k in self.map: # Iterate without allocating on the heap
q0, q1, s = self.map[k]
if q0 is task or q1 is task:
del_s = s
break
if del_s is not None:
self._dequeue(s)
else:
break
def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push_head(sm[0])
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)
################################################################################
# Main run loop
# Ensure the awaitable is a task
def _promote_to_task(aw):
return aw if isinstance(aw, Task) else create_task(aw)
# Create and schedule a new task from a coroutine
def create_task(coro):
if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
_task_queue.push_head(t)
return t
# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _task_queue is ready to run
dt = 1
while dt > 0:
dt = -1
t = _task_queue.peek()
if t:
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
return
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _task_queue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
exc = t.data
if not exc:
t.coro.send(None)
else:
t.data = None
t.coro.throw(exc)
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
if isinstance(er, StopIteration):
return er.value
raise er
# Schedule any other tasks waiting on the completion of this task
waiting = False
if hasattr(t, "waiting"):
while t.waiting.peek():
_task_queue.push_head(t.waiting.pop_head())
waiting = True
t.waiting = None # Free waiting queue head
if not waiting and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
_task_queue.push_head(t)
# Indicate task is done by setting coro to the task object itself
t.coro = t
# Save return value of coro to pass up to caller
t.data = er
# Create a new task from a coroutine and run it until it finishes
def run(coro):
return run_until_complete(create_task(coro))
################################################################################
# Event loop wrapper
async def _stopper():
pass
_stop_task = None
class Loop:
_exc_handler = None
def create_task(coro):
return create_task(coro)
def run_forever():
global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left
def run_until_complete(aw):
return run_until_complete(_promote_to_task(aw))
def stop():
global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None
def close():
pass
def set_exception_handler(handler):
Loop._exc_handler = handler
def get_exception_handler():
return Loop._exc_handler
def default_exception_handler(loop, context):
print(context["message"])
print("future:", context["future"], "coro=", context["future"].coro)
sys.print_exception(context["exception"])
def call_exception_handler(context):
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
return Loop
def current_task():
return cur_task
def new_event_loop():
global _task_queue, _io_queue
# TaskQueue of Task instances
_task_queue = TaskQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
return Loop
# Initialise default event loop
new_event_loop()

View File

@@ -0,0 +1,62 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
# Event class for primitive events that can be waited on, set, and cleared
class Event:
def __init__(self):
self.state = False # False=unset; True=set
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
def is_set(self):
return self.state
def set(self):
# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
while self.waiting.peek():
core._task_queue.push_head(self.waiting.pop_head())
self.state = True
def clear(self):
self.state = False
async def wait(self):
if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
yield
return True
# MicroPython-extension: This can be set from outside the asyncio event loop,
# such as other threads, IRQs or scheduler context. Implementation is a stream
# that asyncio will poll until a flag is set.
# Note: Unlike Event, this is self-clearing.
try:
import uio
class ThreadSafeFlag(uio.IOBase):
def __init__(self):
self._flag = 0
def ioctl(self, req, flags):
if req == 3: # MP_STREAM_POLL
return self._flag * flags
return None
def set(self):
self._flag = 1
async def wait(self):
if not self._flag:
yield core._io_queue.queue_read(self)
self._flag = 0
except ImportError:
pass

View File

@@ -0,0 +1,74 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
async def wait_for(aw, timeout, sleep=core.sleep):
aw = core._promote_to_task(aw)
if timeout is None:
return await aw
def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()
# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
else:
# aw raised an exception, propagate it out to the caller.
raise status
# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)
async def gather(*aws, return_exceptions=False):
ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except Exception as er:
if return_exceptions:
ts[i] = er
else:
raise er
return ts

View File

@@ -0,0 +1,53 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
# Lock class for primitive mutex capability
class Lock:
def __init__(self):
# The state can take the following values:
# - 0: unlocked
# - 1: locked
# - <Task>: unlocked but this task has been scheduled to acquire the lock next
self.state = 0
# Queue of Tasks waiting to acquire this Lock
self.waiting = core.TaskQueue()
def locked(self):
return self.state == 1
def release(self):
if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
# Task(s) waiting on lock, schedule next Task
self.state = self.waiting.pop_head()
core._task_queue.push_head(self.state)
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
yield
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 1
self.release()
raise er
# Lock available, set it as locked
self.state = 1
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()

View File

@@ -0,0 +1,13 @@
# This list of frozen files doesn't include task.py because that's provided by the C module.
freeze(
"..",
(
"uasyncio/__init__.py",
"uasyncio/core.py",
"uasyncio/event.py",
"uasyncio/funcs.py",
"uasyncio/lock.py",
"uasyncio/stream.py",
),
opt=3,
)

View File

@@ -0,0 +1,158 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
from . import core
class Stream:
def __init__(self, s, e={}):
self.s = s
self.e = e
self.out_buf = b""
def get_extra_info(self, v):
return self.e[v]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
def close(self):
pass
async def wait_closed(self):
# TODO yield?
self.s.close()
async def read(self, n):
yield core._io_queue.queue_read(self.s)
return self.s.read(n)
async def readexactly(self, n):
r = b""
while n:
yield core._io_queue.queue_read(self.s)
r2 = self.s.read(n)
if r2 is not None:
if not len(r2):
raise EOFError
r += r2
n -= len(r2)
return r
async def readline(self):
l = b""
while True:
yield core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
def write(self, buf):
self.out_buf += buf
async def drain(self):
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
yield core._io_queue.queue_write(self.s)
ret = self.s.write(mv[off:])
if ret is not None:
off += ret
self.out_buf = b""
# Stream can be used for both reading and writing to save code size
StreamReader = Stream
StreamWriter = Stream
# Create a TCP stream connection to a remote host
async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
ss = Stream(s)
try:
s.connect(ai[-1])
except OSError as er:
if er.errno != EINPROGRESS:
raise er
yield core._io_queue.queue_write(s)
return ss, ss
# Class representing a TCP stream server, can be closed and used in "async with"
class Server:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
def close(self):
self.task.cancel()
async def wait_closed(self):
await self.task
async def _serve(self, cb, host, port, backlog):
import usocket as socket
ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
self.task = core.cur_task
# Accept incoming connections
while True:
try:
yield core._io_queue.queue_read(s)
except core.CancelledError:
# Shutdown server
s.close()
return
try:
s2, addr = s.accept()
except:
# Ignore a failed accept
continue
s2.setblocking(False)
s2s = Stream(s2, {"peername": addr})
core.create_task(cb(s2s, s2s))
# Helper function to start a TCP stream server, running as a new task
# TODO could use an accept-callback on socket read activity instead of creating a task
async def start_server(cb, host, port, backlog=5):
s = Server()
core.create_task(s._serve(cb, host, port, backlog))
return s
################################################################################
# Legacy uasyncio compatibility
async def stream_awrite(self, buf, off=0, sz=-1):
if off != 0 or sz != -1:
buf = memoryview(buf)
if sz == -1:
sz = len(buf)
buf = buf[off : off + sz]
self.write(buf)
await self.drain()
Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?

184
tests/libs/uasyncio/task.py Normal file
View File

@@ -0,0 +1,184 @@
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
# They can optionally be replaced by C implementations.
from . import core
# pairing-heap meld of 2 heaps; O(1)
def ph_meld(h1, h2):
if h1 is None:
return h2
if h2 is None:
return h1
lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
if lt:
if h1.ph_child is None:
h1.ph_child = h2
else:
h1.ph_child_last.ph_next = h2
h1.ph_child_last = h2
h2.ph_next = None
h2.ph_rightmost_parent = h1
return h1
else:
h1.ph_next = h2.ph_child
h2.ph_child = h1
if h1.ph_next is None:
h2.ph_child_last = h1
h1.ph_rightmost_parent = h2
return h2
# pairing-heap pairing operation; amortised O(log N)
def ph_pairing(child):
heap = None
while child is not None:
n1 = child
child = child.ph_next
n1.ph_next = None
if child is not None:
n2 = child
child = child.ph_next
n2.ph_next = None
n1 = ph_meld(n1, n2)
heap = ph_meld(heap, n1)
return heap
# pairing-heap delete of a node; stable, amortised O(log N)
def ph_delete(heap, node):
if node is heap:
child = heap.ph_child
node.ph_child = None
return ph_pairing(child)
# Find parent of node
parent = node
while parent.ph_next is not None:
parent = parent.ph_next
parent = parent.ph_rightmost_parent
# Replace node with pairing of its children
if node is parent.ph_child and node.ph_child is None:
parent.ph_child = node.ph_next
node.ph_next = None
return heap
elif node is parent.ph_child:
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
parent.ph_child = node
else:
n = parent.ph_child
while node is not n.ph_next:
n = n.ph_next
child = node.ph_child
next = node.ph_next
node.ph_child = None
node.ph_next = None
node = ph_pairing(child)
if node is None:
node = n
else:
n.ph_next = node
node.ph_next = next
if next is None:
node.ph_rightmost_parent = parent
parent.ph_child_last = node
return heap
# TaskQueue class based on the above pairing-heap functions.
class TaskQueue:
def __init__(self):
self.heap = None
def peek(self):
return self.heap
def push_sorted(self, v, key):
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
self.heap = ph_meld(v, self.heap)
def push_head(self, v):
self.push_sorted(v, core.ticks())
def pop_head(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
return v
def remove(self, v):
self.heap = ph_delete(self.heap, v)
# Task class representing a coroutine, can be waited on and cancelled.
class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
self.ph_next = None # Paring heap
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
if self.coro is self:
# Signal that the completed-task has been await'ed on.
self.waiting = None
elif not hasattr(self, "waiting"):
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
self.waiting = TaskQueue()
return self
def __next__(self):
if self.coro is self:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.waiting.push_head(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
return self.coro is self
def cancel(self):
# Check if task is already finished.
if self.coro is self:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
self.data = core.CancelledError
return True
def throw(self, value):
# This task raised an exception which was uncaught; handle that now.
# Set the data because it was cleared by the main scheduling loop.
self.data = value
if not hasattr(self, "waiting"):
# Nothing await'ed on the task so call the exception handler.
core._exc_context["exception"] = value
core._exc_context["future"] = self
core.Loop.call_exception_handler(core._exc_context)

224
tests/libs/unittest.py Normal file
View File

@@ -0,0 +1,224 @@
import sys
class SkipTest(Exception):
pass
class AssertRaisesContext:
def __init__(self, exc):
self.expected = exc
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
assert False, "%r not raised" % self.expected
if issubclass(exc_type, self.expected):
return True
return False
class TestCase:
def fail(self, msg=''):
assert False, msg
def assertEqual(self, x, y, msg=''):
if not msg:
msg = "%r vs (expected) %r" % (x, y)
assert x == y, msg
def assertNotEqual(self, x, y, msg=''):
if not msg:
msg = "%r not expected to be equal %r" % (x, y)
assert x != y, msg
def assertAlmostEqual(self, x, y, places=None, msg='', delta=None):
if x == y:
return
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
if delta is not None:
if abs(x - y) <= delta:
return
if not msg:
msg = '%r != %r within %r delta' % (x, y, delta)
else:
if places is None:
places = 7
if round(abs(y-x), places) == 0:
return
if not msg:
msg = '%r != %r within %r places' % (x, y, places)
assert False, msg
def assertNotAlmostEqual(self, x, y, places=None, msg='', delta=None):
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
if delta is not None:
if not (x == y) and abs(x - y) > delta:
return
if not msg:
msg = '%r == %r within %r delta' % (x, y, delta)
else:
if places is None:
places = 7
if not (x == y) and round(abs(y-x), places) != 0:
return
if not msg:
msg = '%r == %r within %r places' % (x, y, places)
assert False, msg
def assertIs(self, x, y, msg=''):
if not msg:
msg = "%r is not %r" % (x, y)
assert x is y, msg
def assertIsNot(self, x, y, msg=''):
if not msg:
msg = "%r is %r" % (x, y)
assert x is not y, msg
def assertIsNone(self, x, msg=''):
if not msg:
msg = "%r is not None" % x
assert x is None, msg
def assertIsNotNone(self, x, msg=''):
if not msg:
msg = "%r is None" % x
assert x is not None, msg
def assertTrue(self, x, msg=''):
if not msg:
msg = "Expected %r to be True" % x
assert x, msg
def assertFalse(self, x, msg=''):
if not msg:
msg = "Expected %r to be False" % x
assert not x, msg
def assertIn(self, x, y, msg=''):
if not msg:
msg = "Expected %r to be in %r" % (x, y)
assert x in y, msg
def assertIsInstance(self, x, y, msg=''):
assert isinstance(x, y), msg
def assertRaises(self, exc, func=None, *args, **kwargs):
if func is None:
return AssertRaisesContext(exc)
try:
func(*args, **kwargs)
assert False, "%r not raised" % exc
except Exception as e:
if isinstance(e, exc):
return
raise
def skip(msg):
def _decor(fun):
# We just replace original fun with _inner
def _inner(self):
raise SkipTest(msg)
return _inner
return _decor
def skipIf(cond, msg):
if not cond:
return lambda x: x
return skip(msg)
def skipUnless(cond, msg):
if cond:
return lambda x: x
return skip(msg)
class TestSuite:
def __init__(self):
self.tests = []
def addTest(self, cls):
self.tests.append(cls)
class TestRunner:
def run(self, suite):
res = TestResult()
for c in suite.tests:
run_class(c, res)
print("Ran %d tests\n" % res.testsRun)
if res.failuresNum > 0 or res.errorsNum > 0:
print("FAILED (failures=%d, errors=%d)" % (res.failuresNum, res.errorsNum))
else:
msg = "OK"
if res.skippedNum > 0:
msg += " (%d skipped)" % res.skippedNum
print(msg)
return res
class TestResult:
def __init__(self):
self.errorsNum = 0
self.failuresNum = 0
self.skippedNum = 0
self.testsRun = 0
def wasSuccessful(self):
return self.errorsNum == 0 and self.failuresNum == 0
# TODO: Uncompliant
def run_class(c, test_result):
o = c()
set_up = getattr(o, "setUp", lambda: None)
tear_down = getattr(o, "tearDown", lambda: None)
for name in dir(o):
if name.startswith("test"):
print("%s (%s) ..." % (name, c.__qualname__), end="")
m = getattr(o, name)
set_up()
try:
test_result.testsRun += 1
m()
print(" ok")
except SkipTest as e:
print(" skipped:", e.args[0])
test_result.skippedNum += 1
except:
print(" FAIL")
test_result.failuresNum += 1
# Uncomment to investigate failure in detail
#raise
continue
finally:
tear_down()
def main(module="__main__"):
def test_cases(m):
for tn in dir(m):
c = getattr(m, tn)
if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase):
yield c
m = __import__(module)
suite = TestSuite()
for c in test_cases(m):
suite.addTest(c)
runner = TestRunner()
result = runner.run(suite)
# Terminate with non zero return code in case of failures
sys.exit(result.failuresNum > 0)

View File

View File

@@ -0,0 +1,206 @@
import sys
import unittest
from microdot import Microdot, Response
from tests import mock_socket
def mock_create_thread(f, *args, **kwargs):
f(*args, **kwargs)
class TestMicrodot(unittest.TestCase):
def setUp(self):
# mock socket module
self.original_socket = sys.modules['microdot'].socket
self.original_create_thread = sys.modules['microdot'].create_thread
sys.modules['microdot'].socket = mock_socket
sys.modules['microdot'].create_thread = mock_create_thread
def tearDown(self):
# restore original socket module
sys.modules['microdot'].socket = self.original_socket
sys.modules['microdot'].create_thread = self.original_create_thread
def _add_shutdown(self, app):
@app.route('/shutdown')
def shutdown(req):
app.shutdown()
return ''
mock_socket.add_request('GET', '/shutdown')
def test_get_request(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
def test_post_request(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.route('/', methods=['POST'])
def index_post(req):
return Response('bar')
mock_socket.clear_requests()
fd = mock_socket.add_request('POST', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
def test_before_after_request(self):
app = Microdot()
@app.before_request
def before_request(req):
if req.path == '/bar':
return 'bar', 202
req.g.message = 'baz'
@app.after_request
def after_request_one(req, res):
res.headers['X-One'] = '1'
@app.after_request
def after_request_two(req, res):
res.set_cookie('foo', 'bar')
return res
@app.route('/bar')
def bar(req):
return 'foo'
@app.route('/baz')
def baz(req):
return req.g.message
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/bar')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/baz')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbaz'))
def test_404(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nNot found'))
def test_404_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(404)
def handle_404(req):
return '404'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_500(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
self.assertIn(b'Content-Length: 21\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nInternal server error'))
def test_500_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
@app.errorhandler(500)
def handle_500(req):
return '501', 501
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
def test_exception_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
@app.errorhandler(ZeroDivisionError)
def handle_div_zero(req, exc):
return '501', 501
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))

View File

@@ -0,0 +1,78 @@
import unittest
from microdot import Request
from tests.mock_socket import get_request_fd
class TestRequest(unittest.TestCase):
def test_create_request(self):
fd = get_request_fd('GET', '/foo')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.app, 'app')
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
self.assertEqual(req.http_version, '1.0')
self.assertIsNone(req.query_string)
self.assertEqual(req.args, {})
self.assertEqual(req.headers, {'Host': 'example.com:1234'})
self.assertEqual(req.cookies, {})
self.assertEqual(req.content_length, 0)
self.assertEqual(req.content_type, None)
self.assertEqual(req.body, b'')
self.assertEqual(req.json, None)
self.assertEqual(req.form, None)
def test_headers(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'})
self.assertEqual(req.content_type, 'application/json')
self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'})
self.assertEqual(req.content_length, 3)
self.assertEqual(req.body, b'aaa')
def test_args(self):
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
def test_json(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = Request.create('app', fd, 'addr')
json = req.json
self.assertEqual(json, {'foo': 'bar'})
self.assertTrue(req.json is json)
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.json, [1, '2'])
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = Request.create('app', fd, 'addr')
self.assertIsNone(req.json)
def test_form(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertTrue(req.form is form)
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = Request.create('app', fd, 'addr')
self.assertIsNone(req.form)

View File

@@ -0,0 +1,194 @@
from datetime import datetime
try:
import uio as io
except ImportError:
import io
import unittest
from microdot import Response
class TestResponse(unittest.TestCase):
def test_create_from_string(self):
res = Response('foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, b'foo')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'Content-Length: 3\r\n', response)
self.assertIn(b'Content-Type: text/plain\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
def test_create_from_string_with_content_length(self):
res = Response('foo', headers={'Content-Length': '2'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Length': '2'})
self.assertEqual(res.body, b'foo')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'Content-Length: 2\r\n', response)
self.assertIn(b'Content-Type: text/plain\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
def test_create_from_bytes(self):
res = Response(b'foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, b'foo')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'Content-Length: 3\r\n', response)
self.assertIn(b'Content-Type: text/plain\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
def test_create_empty(self):
res = Response(headers={'X-Foo': 'Bar'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'X-Foo': 'Bar'})
self.assertEqual(res.body, b'')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'X-Foo: Bar\r\n', response)
self.assertIn(b'Content-Length: 0\r\n', response)
self.assertIn(b'Content-Type: text/plain\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\n'))
def test_create_json(self):
res = Response({'foo': 'bar'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
self.assertEqual(res.body, b'{"foo": "bar"}')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'Content-Length: 14\r\n', response)
self.assertIn(b'Content-Type: application/json\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\n{"foo": "bar"}'))
res = Response([1, '2'])
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
self.assertEqual(res.body, b'[1, "2"]')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
self.assertIn(b'Content-Length: 8\r\n', response)
self.assertIn(b'Content-Type: application/json\r\n', response)
self.assertTrue(response.endswith(b'\r\n\r\n[1, "2"]'))
def test_create_from_other(self):
res = Response(123)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, 123)
def test_create_with_status_code(self):
res = Response('not found', 404)
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, b'not found')
def test_create_with_headers(self):
res = Response('foo', headers={'X-Test': 'Foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'X-Test': 'Foo'})
self.assertEqual(res.body, b'foo')
def test_create_with_status_code_and_headers(self):
res = Response('foo', 202, {'X-Test': 'Foo'})
self.assertEqual(res.status_code, 202)
self.assertEqual(res.headers, {'X-Test': 'Foo'})
self.assertEqual(res.body, b'foo')
def test_cookies(self):
res = Response('ok')
res.set_cookie('foo1', 'bar1')
res.set_cookie('foo2', 'bar2', path='/')
res.set_cookie('foo3', 'bar3', domain='example.com:1234')
res.set_cookie('foo4', 'bar4',
expires=datetime(2019, 11, 5, 2, 23, 54))
res.set_cookie('foo5', 'bar5', max_age=123)
res.set_cookie('foo6', 'bar6', secure=True, http_only=True)
res.set_cookie('foo7', 'bar7', path='/foo', domain='example.com:1234',
expires=datetime(2019, 11, 5, 2, 23, 54), max_age=123,
secure=True, http_only=True)
self.assertEqual(res.headers, {'Set-Cookie': [
'foo1=bar1',
'foo2=bar2; Path=/',
'foo3=bar3; Domain=example.com:1234',
'foo4=bar4; Expires=Tue, 05 Nov 2019 02:23:54 GMT',
'foo5=bar5; Max-Age=123',
'foo6=bar6; Secure; HttpOnly',
'foo7=bar7; Path=/foo; Domain=example.com:1234; '
'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; '
'HttpOnly'
]})
def test_redirect(self):
res = Response.redirect('/foo')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/foo')
res = Response.redirect('/foo', status_code=301)
self.assertEqual(res.status_code, 301)
self.assertEqual(res.headers['Location'], '/foo')
def test_send_file(self):
files = [
('test.txt', 'text/plain'),
('test.gif', 'image/gif'),
('test.jpg', 'image/jpeg'),
('test.png', 'image/png'),
('test.html', 'text/html'),
('test.css', 'text/css'),
('test.js', 'application/javascript'),
('test.json', 'application/json'),
('test.bin', 'application/octet-stream'),
]
for file, content_type in files:
res = Response.send_file('tests/files/' + file)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], content_type)
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(response, (
b'HTTP/1.0 200 OK\r\nContent-Type: ' + content_type.encode()
+ b'\r\n\r\nfoo\n'))
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(
response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
def test_send_file_small_buffer(self):
original_buffer_size = Response.send_file_buffer_size
Response.send_file_buffer_size = 2
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = io.BytesIO()
res.write(fd)
response = fd.getvalue()
self.assertEqual(
response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
Response.send_file_buffer_size = original_buffer_size

View File

@@ -0,0 +1,97 @@
import unittest
from microdot import URLPattern
class TestURLPattern(unittest.TestCase):
def test_static(self):
p = URLPattern('/')
self.assertEqual(p.match('/'), {})
self.assertIsNone(p.match('/foo'))
p = URLPattern('/foo/bar')
self.assertEqual(p.match('/foo/bar'), {})
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar/'))
p = URLPattern('/foo//bar/baz/')
self.assertEqual(p.match('/foo//bar/baz/'), {})
self.assertIsNone(p.match('/foo/bar/baz/'))
self.assertIsNone(p.match('/foo'))
self.assertIsNone(p.match('/foo/bar/baz'))
def test_string_argument(self):
p = URLPattern('/<arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/'))
p = URLPattern('/<arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo'))
p = URLPattern('/<string:arg>')
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/'))
p = URLPattern('/<string:arg>/')
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo'))
p = URLPattern('/foo/<arg1>/bar/<arg2>')
self.assertEqual(p.match('/foo/one/bar/two'),
{'arg1': 'one', 'arg2': 'two'})
self.assertIsNone(p.match('/'))
self.assertIsNone(p.match('/foo/'))
def test_int_argument(self):
p = URLPattern('/users/<int:id>')
self.assertEqual(p.match('/users/123'), {'id': 123})
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/abc'))
self.assertIsNone(p.match('/users/123abc'))
self.assertIsNone(p.match('/users/123/abc'))
p = URLPattern('/users/<int:id>/<int:id2>/')
self.assertEqual(p.match('/users/123/456/'), {'id': 123, 'id2': 456})
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/123/456'))
self.assertIsNone(p.match('/users/123/abc/'))
self.assertIsNone(p.match('/users/123/456/abc'))
def test_path_argument(self):
p = URLPattern('/users/<path:path>')
self.assertEqual(p.match('/users/123'), {'path': '123'})
self.assertEqual(p.match('/users/123/'), {'path': '123/'})
self.assertEqual(p.match('/users/abc/def'), {'path': 'abc/def'})
self.assertIsNone(p.match('/users/'))
p = URLPattern('/users/<path:path>/foo')
self.assertEqual(p.match('/users/123/foo'), {'path': '123'})
self.assertEqual(p.match('/users/foo/foo'), {'path': 'foo'})
self.assertEqual(p.match('/users/abc/def/foo'), {'path': 'abc/def'})
self.assertIsNone(p.match('/users/'))
self.assertIsNone(p.match('/users/foo'))
self.assertIsNone(p.match('/users/foo/'))
def test_regex_argument(self):
p = URLPattern('/users/<re:[a-c]+:id>')
self.assertEqual(p.match('/users/ab'), {'id': 'ab'})
self.assertEqual(p.match('/users/bca'), {'id': 'bca'})
self.assertIsNone(p.match('/users/abcd'))
def test_many_arguments(self):
p = URLPattern('/foo/<path:path>/<int:id>/bar/<name>')
self.assertEqual(p.match('/foo/abc/def/123/bar/test'),
{'path': 'abc/def', 'id': 123, 'name': 'test'})
self.assertIsNone(p.match('/foo/123/bar/test'))
self.assertIsNone(p.match('/foo/abc/def/ghi/bar/test'))
self.assertIsNone(p.match('/foo/abc/def/123/bar'))
self.assertIsNone(p.match('/foo/abc/def/123/bar/'))
self.assertIsNone(p.match('/foo/abc/def/123/test'))
def test_invalid_url_patterns(self):
self.assertRaises(ValueError, URLPattern, '/users/<foo/bar')
self.assertRaises(ValueError, URLPattern, '/users/<badtype:id>')

View File

View File

@@ -0,0 +1,217 @@
import sys
import unittest
from microdot_asyncio import Microdot, Response
from tests import mock_asyncio, mock_socket
class TestMicrodotAsync(unittest.TestCase):
def setUp(self):
# mock socket module
self.original_asyncio = sys.modules['microdot_asyncio'].asyncio
sys.modules['microdot_asyncio'].asyncio = mock_asyncio
def tearDown(self):
# restore original socket module
sys.modules['microdot_asyncio'].asyncio = self.original_asyncio
def _add_shutdown(self, app):
@app.route('/shutdown')
def shutdown(req):
app.shutdown()
return ''
mock_socket.add_request('GET', '/shutdown')
def test_get_request(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.route('/async')
async def index2(req):
return 'foo-async'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
fd2 = mock_socket.add_request('GET', '/async')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
self.assertTrue(fd2.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd2.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
self.assertTrue(fd2.response.endswith(b'\r\n\r\nfoo-async'))
def test_post_request(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.route('/', methods=['POST'])
def index_post(req):
return Response('bar')
@app.route('/async', methods=['POST'])
async def index_post2(req):
return Response('bar-async')
mock_socket.clear_requests()
fd = mock_socket.add_request('POST', '/')
fd2 = mock_socket.add_request('POST', '/async')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
self.assertTrue(fd2.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd2.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
self.assertTrue(fd2.response.endswith(b'\r\n\r\nbar-async'))
def test_before_after_request(self):
app = Microdot()
@app.before_request
def before_request(req):
if req.path == '/bar':
return 'bar', 202
req.g.message = 'baz'
@app.after_request
def after_request_one(req, res):
res.headers['X-One'] = '1'
@app.after_request
async def after_request_two(req, res):
res.set_cookie('foo', 'bar')
return res
@app.route('/bar')
def bar(req):
return 'foo'
@app.route('/baz')
def baz(req):
return req.g.message
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/bar')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/baz')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'X-One: 1\r\n', fd.response)
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nbaz'))
def test_404(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
self.assertIn(b'Content-Length: 9\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nNot found'))
def test_404_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 'foo'
@app.errorhandler(404)
async def handle_404(req):
return '404'
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
def test_500(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
self.assertIn(b'Content-Length: 21\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nInternal server error'))
def test_500_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
@app.errorhandler(500)
def handle_500(req):
return '501', 501
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
def test_exception_handler(self):
app = Microdot()
@app.route('/')
def index(req):
return 1 / 0
@app.errorhandler(ZeroDivisionError)
async def handle_div_zero(req, exc):
return '501', 501
mock_socket.clear_requests()
fd = mock_socket.add_request('GET', '/')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))

View File

@@ -0,0 +1,87 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
import unittest
from microdot_asyncio import Request
from tests.mock_socket import get_async_request_fd
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
class TestRequestAsync(unittest.TestCase):
def test_create_request(self):
fd = get_async_request_fd('GET', '/foo')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.app, 'app')
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
self.assertEqual(req.http_version, '1.0')
self.assertIsNone(req.query_string)
self.assertEqual(req.args, {})
self.assertEqual(req.headers, {'Host': 'example.com:1234'})
self.assertEqual(req.cookies, {})
self.assertEqual(req.content_length, 0)
self.assertEqual(req.content_type, None)
self.assertEqual(req.body, b'')
self.assertEqual(req.json, None)
self.assertEqual(req.form, None)
def test_headers(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'})
self.assertEqual(req.content_type, 'application/json')
self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'})
self.assertEqual(req.content_length, 3)
self.assertEqual(req.body, b'aaa')
def test_args(self):
fd = get_async_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
def test_json(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = _run(Request.create('app', fd, 'addr'))
json = req.json
self.assertEqual(json, {'foo': 'bar'})
self.assertTrue(req.json is json)
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = _run(Request.create('app', fd, 'addr'))
self.assertEqual(req.json, [1, '2'])
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = _run(Request.create('app', fd, 'addr'))
self.assertIsNone(req.json)
def test_form(self):
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertTrue(req.form is form)
fd = get_async_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = _run(Request.create('app', fd, 'addr'))
self.assertIsNone(req.form)

View File

@@ -0,0 +1,111 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
import unittest
from microdot_asyncio import Response
from tests.mock_socket import FakeStreamAsync
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
class TestResponseAsync(unittest.TestCase):
def test_create_from_string(self):
res = Response('foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, b'foo')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
def test_create_from_string_with_content_length(self):
res = Response('foo', headers={'Content-Length': '2'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Length': '2'})
self.assertEqual(res.body, b'foo')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'Content-Length: 2\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
def test_create_from_bytes(self):
res = Response(b'foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {})
self.assertEqual(res.body, b'foo')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
def test_create_empty(self):
res = Response(headers={'X-Foo': 'Bar'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'X-Foo': 'Bar'})
self.assertEqual(res.body, b'')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'X-Foo: Bar\r\n', fd.response)
self.assertIn(b'Content-Length: 0\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
def test_create_json(self):
res = Response({'foo': 'bar'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
self.assertEqual(res.body, b'{"foo": "bar"}')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'Content-Length: 14\r\n', fd.response)
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n{"foo": "bar"}'))
res = Response([1, '2'])
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
self.assertEqual(res.body, b'[1, "2"]')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
self.assertIn(b'Content-Length: 8\r\n', fd.response)
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n[1, "2"]'))
def test_send_file(self):
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertEqual(
fd.response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
def test_send_file_small_buffer(self):
original_buffer_size = Response.send_file_buffer_size
Response.send_file_buffer_size = 2
res = Response.send_file('tests/files/test.txt',
content_type='text/html')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/html')
fd = FakeStreamAsync()
_run(res.write(fd))
self.assertEqual(
fd.response,
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
Response.send_file_buffer_size = original_buffer_size

39
tests/mock_asyncio.py Normal file
View File

@@ -0,0 +1,39 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from tests import mock_socket
def get_event_loop():
return asyncio.get_event_loop()
async def start_server(cb, host, port):
class MockServer:
def __init__(self):
self.closed = False
async def run(self):
s = mock_socket.socket()
while not self.closed:
fd, addr = s.accept()
fd = mock_socket.FakeStreamAsync(fd)
await cb(fd, fd)
def close(self):
self.closed = True
async def wait_closed(self):
while not self.closed:
await asyncio.sleep(0.01)
server = MockServer()
asyncio.get_event_loop().create_task(server.run())
return server
def run(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)

102
tests/mock_socket.py Normal file
View File

@@ -0,0 +1,102 @@
try:
import uio as io
except ImportError:
import io
SOL_SOCKET = 'SOL_SOCKET'
SO_REUSEADDR = 'SO_REUSEADDR'
_requests = []
def getaddrinfo(host, port):
return (('family', 'addr'), 'socktype', 'proto', 'canonname', 'sockaddr')
class socket:
def __init__(self):
self.request_index = 0
def setsockopt(self, level, optname, value):
pass
def bind(self, addr):
pass
def listen(self, backlog):
pass
def accept(self):
self.request_index += 1
return _requests[self.request_index - 1], 'addr'
def close(self):
pass
class FakeStream(io.BytesIO):
def __init__(self, input_data):
super().__init__(input_data)
self.response = b''
def write(self, data):
self.response += data
class FakeStreamAsync:
def __init__(self, stream=None):
if stream is None:
stream = FakeStream(b'')
self.stream = stream
@property
def response(self):
return self.stream.response
async def readline(self):
return self.stream.readline()
async def read(self, n):
return self.stream.read(n)
async def awrite(self, data):
self.stream.write(data)
async def aclose(self):
pass
def get_extra_info(self, name, default=None):
return name
def get_request_fd(method, path, headers=None, body=None):
if headers is None:
headers = {}
if body is None:
body = ''
elif 'Content-Length' not in headers:
headers['Content-Length'] = str(len(body))
request_bytes = '{method} {path} HTTP/1.0\n'.format(
method=method, path=path)
if 'Host' not in headers:
headers['Host'] = 'example.com:1234'
for header, value in headers.items():
request_bytes += '{header}: {value}\n'.format(
header=header, value=value)
request_bytes += '\n' + body
return FakeStream(request_bytes.encode())
def get_async_request_fd(method, path, headers=None, body=None):
fd = get_request_fd(method, path, headers=headers, body=body)
return FakeStreamAsync(fd)
def clear_requests():
_requests.clear()
def add_request(method, path, headers=None, body=None):
fd = get_request_fd(method, path, headers=headers, body=body)
_requests.append(fd)
return fd

25
tools/Dockerfile Normal file
View File

@@ -0,0 +1,25 @@
FROM ubuntu:latest
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y build-essential libffi-dev git pkg-config python python3 && \
rm -rf /var/lib/apt/lists/* && \
git clone https://github.com/micropython/micropython.git && \
cd micropython && \
git checkout v1.15 && \
git submodule update --init && \
cd mpy-cross && \
make && \
cd .. && \
cd ports/unix && \
make axtls && \
make && \
make test && \
make install && \
apt-get purge --auto-remove -y build-essential libffi-dev git pkg-config python python3 && \
cd ../../.. && \
rm -rf micropython
CMD ["/usr/local/bin/micropython"]

6
tools/update-micropython.sh Executable file
View File

@@ -0,0 +1,6 @@
# this script updates the micropython binary in the /bin directory that is
# used to run unit tests under GitHub Actions builds
docker build -t micropython .
docker create -it --name dummy-micropython micropython
docker cp dummy-micropython:/usr/local/bin/micropython ../bin/micropython
docker rm dummy-micropython

35
tox.ini Normal file
View File

@@ -0,0 +1,35 @@
[tox]
envlist=flake8,py36,py37,py38,py39,upy
skipsdist=True
skip_missing_interpreters=True
[gh-actions]
python =
3.6: py36
3.7: py37
3.8: py38
3.9: py39
pypy3: pypy3
[testenv]
commands=
pip install -e microdot
pip install -e microdot-asyncio
coverage run --branch --include="microdot*.py" -m unittest tests
coverage report --show-missing
deps=coverage
[testenv:flake8]
deps=
flake8
commands=
flake8 --ignore=W503 --exclude tests/libs microdot microdot-asyncio tests
[testenv:upy]
whitelist_externals=sh
commands=sh -c "bin/micropython run_tests.py"
[testenv:upy-mac]
whitelist_externals=micropython
commands=micropython run_tests.py
deps=