Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64cc172917 | ||
|
|
b9ca036e1d | ||
|
|
b06b6de584 | ||
|
|
e5525c5c48 | ||
|
|
494800ff9f | ||
|
|
ba986a89ff | ||
|
|
89f7f09b9a | ||
|
|
3d9b5d7084 | ||
|
|
03efe46a26 | ||
|
|
b16466f1a9 | ||
|
|
8aa50f171d | ||
|
|
76ab1fa6d7 | ||
|
|
0a373775d5 | ||
|
|
cd71986a50 | ||
|
|
0b95feafc9 | ||
|
|
9b32292f21 | ||
|
|
f741ed7cf8 | ||
|
|
92edc17522 | ||
|
|
4c83cb7563 | ||
|
|
491202de1f | ||
|
|
db29624a45 | ||
|
|
0f2c749f6d | ||
|
|
52f2d0c491 | ||
|
|
2f58c41cc8 |
3
.flake8
Normal file
3
.flake8
Normal file
@@ -0,0 +1,3 @@
|
||||
[flake8]
|
||||
select = C,E,F,W,B,B950
|
||||
per-file-ignores = ./*/__init__.py:F401
|
||||
18
.travis.yml
Normal file
18
.travis.yml
Normal file
@@ -0,0 +1,18 @@
|
||||
dist: xenial
|
||||
language: python
|
||||
matrix:
|
||||
include:
|
||||
- python: 3.7
|
||||
env: TOXENV=flake8
|
||||
- python: 3.5
|
||||
env: TOXENV=py35
|
||||
- python: 3.6
|
||||
env: TOXENV=py36
|
||||
- python: 3.7
|
||||
env: TOXENV=py37
|
||||
- python: 3.7
|
||||
env: TOXENV=upy
|
||||
install:
|
||||
- pip install tox
|
||||
script:
|
||||
- tox
|
||||
@@ -1,2 +1,4 @@
|
||||
# microdot
|
||||
[](https://travis-ci.org/miguelgrinberg/microdot)
|
||||
|
||||
A minimalistic Python web framework for microcontrollers inspired by Flask
|
||||
|
||||
BIN
bin/micropython
Executable file
BIN
bin/micropython
Executable file
Binary file not shown.
33
bin/mkrelease
Executable file
33
bin/mkrelease
Executable file
@@ -0,0 +1,33 @@
|
||||
#!/bin/bash
|
||||
VERSION=$1
|
||||
if [[ "$VERSION" == "" ]]; then
|
||||
echo Usage: $0 "<version>"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
git diff --cached --exit-code >/dev/null
|
||||
if [[ "$?" != "0" ]]; then
|
||||
echo Commit your changes before using this script.
|
||||
exit 1
|
||||
fi
|
||||
|
||||
set -e
|
||||
for PKG in microdot*; do
|
||||
echo Building $PKG...
|
||||
cd $PKG
|
||||
sed -i "s/version.*$/version=\"$VERSION\",/" setup.py
|
||||
git add setup.py
|
||||
rm -rf dist
|
||||
python setup.py sdist bdist_wheel --universal
|
||||
cd ..
|
||||
done
|
||||
git commit -m "Release v$VERSION"
|
||||
git tag v$VERSION
|
||||
git push --tags origin master
|
||||
|
||||
for PKG in microdot*; do
|
||||
echo Releasing $PKG...
|
||||
cd $PKG
|
||||
twine upload dist/*
|
||||
cd ..
|
||||
done
|
||||
@@ -2,14 +2,75 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>Microdot GPIO Example</title>
|
||||
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
|
||||
<script>
|
||||
function getCookie(name) {
|
||||
var value = "; " + document.cookie;
|
||||
var parts = value.split("; " + name + "=");
|
||||
if (parts.length == 2)
|
||||
return parts.pop().split(";").shift();
|
||||
}
|
||||
function showMessage() {
|
||||
document.getElementById('message').innerHTML = getCookie('message');
|
||||
}
|
||||
function onLoad() {
|
||||
showMessage();
|
||||
var form = getCookie('form');
|
||||
if (form) {
|
||||
form = form.split(',')
|
||||
document.getElementById('pin').selectedIndex = parseInt(form[0]);
|
||||
document.getElementById(form[1]).checked = true;
|
||||
}
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<body onload="onLoad();">
|
||||
<div class="container">
|
||||
<h1>Microdot GPIO Example</h1>
|
||||
<div class="alert alert-primary" role="alert" id="message">
|
||||
</div>
|
||||
<form method="POST" action="">
|
||||
<p>GPIO Pin: <input type="text" name="pin" size="3"></p>
|
||||
<input type="submit" name="read" value="Read">
|
||||
<input type="submit" name="set-low" value="Set Low">
|
||||
<input type="submit" name="set-high" value="Set high">
|
||||
<p>
|
||||
GPIO Pin:
|
||||
<select name="pin" id="pin">
|
||||
<option>0</option>
|
||||
<option>1</option>
|
||||
<option>2</option>
|
||||
<option>3</option>
|
||||
<option>4</option>
|
||||
<option>5</option>
|
||||
<option>6</option>
|
||||
<option>7</option>
|
||||
<option>8</option>
|
||||
<option>9</option>
|
||||
<option>10</option>
|
||||
<option>11</option>
|
||||
<option>12</option>
|
||||
<option>13</option>
|
||||
<option>14</option>
|
||||
<option>15</option>
|
||||
<option>16</option>
|
||||
</select>
|
||||
</p>
|
||||
<div>
|
||||
<p>
|
||||
<input type="radio" name="pull" value="pullup" id="pullup">
|
||||
<label for="pullup">Pull-Up</label>
|
||||
<input type="radio" name="pull" value="pulldown" id="pulldown">
|
||||
<label for="pulldown">Pull-Down</label>
|
||||
<input type="radio" name="pull" value="pullnone" id="pullnone" checked>
|
||||
<label for="pullnone">None</label>
|
||||
<br>
|
||||
<input type="submit" class="btn btn-outline-dark" name="read" value="Read">
|
||||
</p>
|
||||
</div>
|
||||
<div>
|
||||
<p>
|
||||
<input type="submit" class="btn btn-outline-dark" name="set-low" value="Set Low">
|
||||
<input type="submit" class="btn btn-outline-dark" name="set-high" value="Set high">
|
||||
</p>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
@@ -6,14 +6,38 @@ app = Microdot()
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def index(request):
|
||||
form_cookie = None
|
||||
message_cookie = None
|
||||
if request.method == 'POST':
|
||||
if 'set-read' in request.form:
|
||||
pin = machine.Pin(int(request.form['pin']), machine.Pin.IN)
|
||||
form_cookie = '{pin},{pull}'.format(pin=request.form['pin'],
|
||||
pull=request.form['pull'])
|
||||
if 'read' in request.form:
|
||||
pull = None
|
||||
if request.form['pull'] == 'pullup':
|
||||
pull = machine.Pin.PULL_UP
|
||||
elif request.form['pull'] == 'pulldown':
|
||||
pull = machine.Pin.PULL_DOWN
|
||||
pin = machine.Pin(int(request.form['pin']), machine.Pin.IN, pull)
|
||||
message_cookie = 'Input pin {pin} is {state}.'.format(
|
||||
pin=request.form['pin'],
|
||||
state='high' if pin.value() else 'low')
|
||||
else:
|
||||
pin = machine.Pin(int(request.form['pin']), machine.Pin.OUT)
|
||||
pin.value(0 if 'set-low' in request.form else 1)
|
||||
return redirect('/')
|
||||
return send_file('gpio.html')
|
||||
value = 0 if 'set-low' in request.form else 1
|
||||
pin.value(value)
|
||||
message_cookie = 'Output pin {pin} is now {state}.'.format(
|
||||
pin=request.form['pin'],
|
||||
state='high' if value else 'low')
|
||||
response = redirect('/')
|
||||
else:
|
||||
if 'message' not in request.cookies:
|
||||
message_cookie = 'Select a pin and an operation below.'
|
||||
response = send_file('gpio.html')
|
||||
if form_cookie:
|
||||
response.set_cookie('form', form_cookie)
|
||||
if message_cookie:
|
||||
response.set_cookie('message', message_cookie)
|
||||
return response
|
||||
|
||||
|
||||
app.run()
|
||||
app.run(debug=True)
|
||||
|
||||
150
microdot-asyncio/microdot_asyncio.py
Normal file
150
microdot-asyncio/microdot_asyncio.py
Normal file
@@ -0,0 +1,150 @@
|
||||
try:
|
||||
import uasyncio as asyncio
|
||||
except ImportError:
|
||||
import asyncio
|
||||
from microdot import Microdot as BaseMicrodot
|
||||
from microdot import print_exception
|
||||
from microdot import Request as BaseRequest
|
||||
from microdot import Response as BaseResponse
|
||||
|
||||
|
||||
def _iscoroutine(coro):
|
||||
return hasattr(coro, 'send') and hasattr(coro, 'throw')
|
||||
|
||||
|
||||
class Request(BaseRequest):
|
||||
@staticmethod
|
||||
async def create(stream, client_addr):
|
||||
# request line
|
||||
line = (await stream.readline()).strip().decode()
|
||||
method, url, http_version = line.split()
|
||||
http_version = http_version.split('/', 1)[1]
|
||||
|
||||
# headers
|
||||
headers = {}
|
||||
content_length = 0
|
||||
while True:
|
||||
line = (await stream.readline()).strip().decode()
|
||||
if line == '':
|
||||
break
|
||||
header, value = line.split(':', 1)
|
||||
value = value.strip()
|
||||
headers[header] = value
|
||||
if header == 'Content-Length':
|
||||
content_length = int(value)
|
||||
|
||||
# body
|
||||
body = await stream.read(content_length) \
|
||||
if content_length else b''
|
||||
|
||||
return Request(client_addr, method, url, http_version, headers, body)
|
||||
|
||||
|
||||
class Response(BaseResponse):
|
||||
async def write(self, stream):
|
||||
self.complete()
|
||||
|
||||
# status code
|
||||
await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format(
|
||||
status_code=self.status_code,
|
||||
reason='OK' if self.status_code == 200 else 'N/A').encode())
|
||||
|
||||
# headers
|
||||
for header, value in self.headers.items():
|
||||
values = value if isinstance(value, list) else [value]
|
||||
for value in values:
|
||||
await stream.awrite('{header}: {value}\r\n'.format(
|
||||
header=header, value=value).encode())
|
||||
await stream.awrite(b'\r\n')
|
||||
|
||||
# body
|
||||
if self.body:
|
||||
await stream.awrite(self.body)
|
||||
|
||||
|
||||
class Microdot(BaseMicrodot):
|
||||
def run(self, host='0.0.0.0', port=5000, debug=False):
|
||||
self.debug = debug
|
||||
|
||||
async def serve(reader, writer):
|
||||
if not hasattr(writer, 'awrite'): # pragma: no cover
|
||||
# CPython provides the awrite and aclose methods in 3.8+
|
||||
async def awrite(self, data):
|
||||
self.write(data)
|
||||
await self.drain()
|
||||
|
||||
async def aclose(self):
|
||||
self.close()
|
||||
await self.wait_closed()
|
||||
|
||||
from types import MethodType
|
||||
writer.awrite = MethodType(awrite, writer)
|
||||
writer.aclose = MethodType(aclose, writer)
|
||||
|
||||
await self.dispatch_request(reader, writer)
|
||||
|
||||
if self.debug: # pragma: no cover
|
||||
print('Starting async server on {host}:{port}...'.format(
|
||||
host=host, port=port))
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(asyncio.start_server(serve, host, port))
|
||||
loop.run_forever()
|
||||
loop.close() # pragma: no cover
|
||||
|
||||
async def dispatch_request(self, reader, writer):
|
||||
req = await Request.create(reader, writer.get_extra_info('peername'))
|
||||
f = self.find_route(req)
|
||||
try:
|
||||
res = None
|
||||
if f:
|
||||
for handler in self.before_request_handlers:
|
||||
res = await self._invoke_handler(handler, req)
|
||||
if res:
|
||||
break
|
||||
if res is None:
|
||||
res = await self._invoke_handler(f, req, **req.url_args)
|
||||
if isinstance(res, tuple):
|
||||
res = Response(*res)
|
||||
elif not isinstance(res, Response):
|
||||
res = Response(res)
|
||||
for handler in self.after_request_handlers:
|
||||
res = await self._invoke_handler(handler, req, res) or res
|
||||
elif 404 in self.error_handlers:
|
||||
res = await self._invoke_handler(self.error_handlers[404], req)
|
||||
else:
|
||||
res = 'Not found', 404
|
||||
except Exception as exc:
|
||||
print_exception(exc)
|
||||
res = None
|
||||
if exc.__class__ in self.error_handlers:
|
||||
try:
|
||||
res = await self._invoke_handler(
|
||||
self.error_handlers[exc.__class__], req, exc)
|
||||
except Exception as exc2: # pragma: no cover
|
||||
print_exception(exc2)
|
||||
if res is None:
|
||||
if 500 in self.error_handlers:
|
||||
res = await self._invoke_handler(self.error_handlers[500],
|
||||
req)
|
||||
else:
|
||||
res = 'Internal server error', 500
|
||||
if isinstance(res, tuple):
|
||||
res = Response(*res)
|
||||
elif not isinstance(res, Response):
|
||||
res = Response(res)
|
||||
await res.write(writer)
|
||||
await writer.aclose()
|
||||
if self.debug: # pragma: no cover
|
||||
print('{method} {path} {status_code}'.format(
|
||||
method=req.method, path=req.path,
|
||||
status_code=res.status_code))
|
||||
|
||||
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
|
||||
ret = f_or_coro(*args, **kwargs)
|
||||
if _iscoroutine(ret):
|
||||
ret = await ret
|
||||
return ret
|
||||
|
||||
|
||||
redirect = Response.redirect
|
||||
send_file = Response.send_file
|
||||
35
microdot-asyncio/setup.py
Executable file
35
microdot-asyncio/setup.py
Executable file
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
Microdot-AsyncIO
|
||||
----------------
|
||||
|
||||
AsyncIO support for the Microdot web framework.
|
||||
"""
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='microdot-asyncio',
|
||||
version="0.3.0",
|
||||
url='http://github.com/miguelgrinberg/microdot/',
|
||||
license='MIT',
|
||||
author='Miguel Grinberg',
|
||||
author_email='miguel.grinberg@gmail.com',
|
||||
description='AsyncIO support for the Microdot web framework',
|
||||
long_description=__doc__,
|
||||
py_modules=['microdot_asyncio'],
|
||||
platforms='any',
|
||||
install_requires=[
|
||||
'microdot',
|
||||
'micropython-uasyncio;implementation_name=="micropython"'
|
||||
],
|
||||
classifiers=[
|
||||
'Environment :: Web Environment',
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: MIT License',
|
||||
'Operating System :: OS Independent',
|
||||
'Programming Language :: Python',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: Implementation :: MicroPython',
|
||||
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules'
|
||||
]
|
||||
)
|
||||
285
microdot.py
285
microdot.py
@@ -1,285 +0,0 @@
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
try:
|
||||
import ure as re
|
||||
except ImportError:
|
||||
import re
|
||||
try:
|
||||
import usocket as socket
|
||||
except ImportError:
|
||||
import socket
|
||||
|
||||
|
||||
def urldecode(string):
|
||||
string = string.replace('+', ' ')
|
||||
parts = string.split('%')
|
||||
if len(parts) == 1:
|
||||
return string
|
||||
result = [parts[0]]
|
||||
for item in parts[1:]:
|
||||
if item == '':
|
||||
result.append('%')
|
||||
else:
|
||||
code = item[:2]
|
||||
result.append(chr(int(code, 16)))
|
||||
result.append(item[2:])
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
class Request():
|
||||
def __init__(self, client_sock, client_addr):
|
||||
self.client_sock = client_sock
|
||||
self.client_addr = client_addr
|
||||
|
||||
if not hasattr(client_sock, 'readline'):
|
||||
self.client_stream = client_sock.makefile("rwb")
|
||||
else:
|
||||
self.client_stream = client_sock
|
||||
|
||||
# request line
|
||||
line = self.client_stream.readline().strip().decode('utf-8')
|
||||
self.method, self.path, self.http_version = line.split()
|
||||
if '?' in self.path:
|
||||
self.path, self.query_string = self.path.split('?', 1)
|
||||
else:
|
||||
self.query_string = None
|
||||
|
||||
# headers
|
||||
self.headers = {}
|
||||
self.cookies = {}
|
||||
self.content_length = 0
|
||||
while True:
|
||||
line = self.client_stream.readline().strip().decode('utf-8')
|
||||
if line == '':
|
||||
break
|
||||
header, value = line.split(':', 1)
|
||||
value = value.strip()
|
||||
self.headers[header] = value
|
||||
if header == 'Content-Length':
|
||||
self.content_length = int(value)
|
||||
elif header == 'Content-Type':
|
||||
self.content_type = value
|
||||
elif header == 'Cookie':
|
||||
for cookie in self.headers['Cookie'].split(';'):
|
||||
name, value = cookie.split('=', 1)
|
||||
self.cookies[name] = value
|
||||
|
||||
# body
|
||||
self.body = self.client_stream.read(self.content_length)
|
||||
self._json = None
|
||||
self._form = None
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
if self.content_type != 'application/json':
|
||||
return None
|
||||
if self._json is None:
|
||||
self._json = json.loads(self.body)
|
||||
return self._json
|
||||
|
||||
@property
|
||||
def form(self):
|
||||
if self.content_type != 'application/x-www-form-urlencoded':
|
||||
return None
|
||||
if self._form is None:
|
||||
self._form = {urldecode(key): urldecode(value) for key, value in
|
||||
[pair.split('=', 1) for pair in self.body.decode().split('&')]}
|
||||
return self._form
|
||||
|
||||
def close(self):
|
||||
self.client_stream.close()
|
||||
if self.client_stream != self.client_sock:
|
||||
self.client_sock.close()
|
||||
|
||||
|
||||
class Response():
|
||||
types_map = {
|
||||
'css': 'text/css',
|
||||
'gif': 'image/gif',
|
||||
'html': 'text/html',
|
||||
'jpg': 'image/jpeg',
|
||||
'js': 'application/javascript',
|
||||
'json': 'application/json',
|
||||
'png': 'image/png',
|
||||
'txt': 'text/plain',
|
||||
}
|
||||
|
||||
def __init__(self, body='', status_code=200, headers=None):
|
||||
self.status_code = status_code
|
||||
self.headers = headers or {}
|
||||
self.cookies = []
|
||||
if isinstance(body, (dict, list)):
|
||||
self.body = json.dumps(body).encode()
|
||||
self.headers['Content-Type'] = 'application/json'
|
||||
elif isinstance(body, str):
|
||||
self.body = body.encode()
|
||||
elif isinstance(body, bytes):
|
||||
self.body = body
|
||||
else:
|
||||
self.body = str(body).encode()
|
||||
|
||||
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
|
||||
max_age=None, secure=False, http_only=False):
|
||||
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
|
||||
if path:
|
||||
http_cookie += '; Path=' + path
|
||||
if domain:
|
||||
http_cookie += '; Domain=' + domain
|
||||
if expires:
|
||||
http_cookie += '; Expires=' + expires.strftime(
|
||||
"%a, %d %b %Y %H:%M:%S GMT")
|
||||
if max_age:
|
||||
http_cookie += '; Max-Age=' + str(max_age)
|
||||
if secure:
|
||||
http_cookie += '; Secure'
|
||||
if http_only:
|
||||
http_cookie += '; httpOnly'
|
||||
if 'Set-Cookie' in self.headers:
|
||||
self.headers['Set-Cookie'].append(http_cookie)
|
||||
else:
|
||||
self.headers['Set-Cookie'] = [http_cookie]
|
||||
|
||||
def write(self, client_stream):
|
||||
# status code
|
||||
client_stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
|
||||
status_code=self.status_code,
|
||||
reason='OK' if self.status_code == 200 else 'N/A').encode())
|
||||
|
||||
# headers
|
||||
content_length_found = False
|
||||
content_type_found = False
|
||||
for header, value in self.headers.items():
|
||||
values = value if isinstance(value, list) else [value]
|
||||
for value in values:
|
||||
client_stream.write('{header}: {value}\r\n'.format(
|
||||
header=header, value=value).encode())
|
||||
if header == 'Content-Length':
|
||||
content_length_found = True
|
||||
elif header == 'Content-Type':
|
||||
content_type_found = True
|
||||
if not content_length_found:
|
||||
client_stream.write('Content-Length: {length}\r\n'.format(
|
||||
length=len(self.body)).encode())
|
||||
if not content_type_found:
|
||||
client_stream.write(b'Content-Type: text/plain\r\n')
|
||||
client_stream.write(b'\r\n')
|
||||
|
||||
# body
|
||||
if self.body:
|
||||
client_stream.write(self.body)
|
||||
|
||||
@staticmethod
|
||||
def redirect(location, status_code=302):
|
||||
return Response(status_code=status_code,
|
||||
headers={'Location': location})
|
||||
|
||||
@staticmethod
|
||||
def send_file(filename, status_code=200, content_type=None):
|
||||
if content_type is None:
|
||||
ext = filename.split('.')[-1]
|
||||
if ext in Response.types_map:
|
||||
content_type = Response.types_map[ext]
|
||||
else:
|
||||
content_type = 'application/octet-stream'
|
||||
with open(filename) as f:
|
||||
return Response(body=f.read(), status_code=status_code,
|
||||
headers={'Content-Type': content_type})
|
||||
|
||||
|
||||
class URLPattern():
|
||||
def __init__(self, url_pattern):
|
||||
self.pattern = ''
|
||||
self.args = []
|
||||
use_regex = False
|
||||
for segment in url_pattern.lstrip('/').split('/'):
|
||||
if segment and segment[0] == '<':
|
||||
if segment[-1] != '>':
|
||||
raise ValueError('invalid URL pattern')
|
||||
segment = segment[1:-1]
|
||||
if ':' in segment:
|
||||
type_, name = segment.split(':', 1)
|
||||
else:
|
||||
type_ = 'string'
|
||||
name = segment
|
||||
if type_ == 'string':
|
||||
pattern = '[^/]*'
|
||||
elif type_ == 'int':
|
||||
pattern = '\\d+'
|
||||
elif type_ == 'path':
|
||||
pattern = '.*'
|
||||
elif type_.startswith('regex'):
|
||||
pattern = eval(type_[5:])
|
||||
else:
|
||||
raise ValueError('invalid URL segment type')
|
||||
use_regex = True
|
||||
self.pattern += '/({pattern})'.format(pattern=pattern)
|
||||
self.args.append({'type': type_, 'name': name})
|
||||
else:
|
||||
self.pattern += '/{segment}'.format(segment=segment)
|
||||
if use_regex:
|
||||
self.pattern = re.compile(self.pattern)
|
||||
|
||||
def match(self, path):
|
||||
if isinstance(self.pattern, str):
|
||||
if path != self.pattern:
|
||||
return
|
||||
return {}
|
||||
g = self.pattern.match(path)
|
||||
if not g:
|
||||
return
|
||||
args = {}
|
||||
i = 1
|
||||
for arg in self.args:
|
||||
value = g.group(i)
|
||||
if arg['type'] == 'int':
|
||||
value = int(value)
|
||||
args[arg['name']] = value
|
||||
i += 1
|
||||
return args
|
||||
|
||||
|
||||
class Microdot():
|
||||
def __init__(self) :
|
||||
self.url_map = []
|
||||
|
||||
def route(self, url_pattern, methods=None):
|
||||
def decorated(f):
|
||||
self.url_map.append(
|
||||
(methods or ['GET'], URLPattern(url_pattern), f))
|
||||
return f
|
||||
return decorated
|
||||
|
||||
def run(self, host='0.0.0.0', port=5000):
|
||||
s = socket.socket()
|
||||
ai = socket.getaddrinfo(host, port)
|
||||
addr = ai[0][-1]
|
||||
|
||||
print('Listening on {host}:{port}...'.format(host=host, port=port))
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(addr)
|
||||
s.listen(5)
|
||||
|
||||
while True:
|
||||
req = Request(*s.accept())
|
||||
f = None
|
||||
args = None
|
||||
for route_methods, route_pattern, route_handler in self.url_map:
|
||||
if req.method in route_methods:
|
||||
args = route_pattern.match(req.path)
|
||||
if args is not None:
|
||||
f = route_handler
|
||||
break
|
||||
if f:
|
||||
resp = f(req, **args)
|
||||
if isinstance(resp, tuple):
|
||||
resp = Response(*resp)
|
||||
elif not isinstance(resp, Response):
|
||||
resp = Response(resp)
|
||||
resp.write(req.client_stream)
|
||||
req.close()
|
||||
|
||||
|
||||
redirect = Response.redirect
|
||||
send_file = Response.send_file
|
||||
401
microdot/microdot.py
Normal file
401
microdot/microdot.py
Normal file
@@ -0,0 +1,401 @@
|
||||
try:
|
||||
from sys import print_exception
|
||||
except ImportError: # pragma: no cover
|
||||
import traceback
|
||||
|
||||
def print_exception(exc):
|
||||
traceback.print_exc()
|
||||
|
||||
concurrency_mode = 'threaded'
|
||||
|
||||
try: # pragma: no cover
|
||||
import threading
|
||||
|
||||
def create_thread(f, *args, **kwargs):
|
||||
"""Use the threading module."""
|
||||
threading.Thread(target=f, args=args, kwargs=kwargs).start()
|
||||
except ImportError: # pragma: no cover
|
||||
try:
|
||||
import _thread
|
||||
|
||||
def create_thread(f, *args, **kwargs):
|
||||
"""Use MicroPython's _thread module."""
|
||||
def run():
|
||||
f(*args, **kwargs)
|
||||
|
||||
_thread.start_new_thread(run, ())
|
||||
except ImportError:
|
||||
def create_thread(f, *args, **kwargs):
|
||||
"""No threads available, call function synchronously."""
|
||||
f(*args, **kwargs)
|
||||
|
||||
concurrency_mode = 'sync'
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
try:
|
||||
import ure as re
|
||||
except ImportError:
|
||||
import re
|
||||
|
||||
try:
|
||||
import usocket as socket
|
||||
except ImportError:
|
||||
import socket
|
||||
|
||||
|
||||
def urldecode(string):
|
||||
string = string.replace('+', ' ')
|
||||
parts = string.split('%')
|
||||
if len(parts) == 1:
|
||||
return string
|
||||
result = [parts[0]]
|
||||
for item in parts[1:]:
|
||||
if item == '':
|
||||
result.append('%')
|
||||
else:
|
||||
code = item[:2]
|
||||
result.append(chr(int(code, 16)))
|
||||
result.append(item[2:])
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
class Request():
|
||||
class G:
|
||||
pass
|
||||
|
||||
def __init__(self, client_addr, method, url, http_version, headers, body):
|
||||
self.client_addr = client_addr
|
||||
self.method = method
|
||||
self.path = url
|
||||
self.http_version = http_version
|
||||
if '?' in self.path:
|
||||
self.path, self.query_string = self.path.split('?', 1)
|
||||
self.args = self._parse_urlencoded(self.query_string)
|
||||
else:
|
||||
self.query_string = None
|
||||
self.args = {}
|
||||
self.headers = headers
|
||||
self.cookies = {}
|
||||
self.content_length = 0
|
||||
self.content_type = None
|
||||
for header, value in self.headers.items():
|
||||
if header == 'Content-Length':
|
||||
self.content_length = int(value)
|
||||
elif header == 'Content-Type':
|
||||
self.content_type = value
|
||||
elif header == 'Cookie':
|
||||
for cookie in value.split(';'):
|
||||
name, value = cookie.strip().split('=', 1)
|
||||
self.cookies[name] = value
|
||||
self.body = body
|
||||
self._json = None
|
||||
self._form = None
|
||||
self.g = Request.G()
|
||||
|
||||
@staticmethod
|
||||
def create(client_stream, client_addr):
|
||||
# request line
|
||||
line = client_stream.readline().strip().decode()
|
||||
method, url, http_version = line.split()
|
||||
http_version = http_version.split('/', 1)[1]
|
||||
|
||||
# headers
|
||||
headers = {}
|
||||
content_length = 0
|
||||
while True:
|
||||
line = client_stream.readline().strip().decode()
|
||||
if line == '':
|
||||
break
|
||||
header, value = line.split(':', 1)
|
||||
value = value.strip()
|
||||
headers[header] = value
|
||||
if header == 'Content-Length':
|
||||
content_length = int(value)
|
||||
|
||||
# body
|
||||
body = client_stream.read(content_length) if content_length else b''
|
||||
|
||||
return Request(client_addr, method, url, http_version, headers, body)
|
||||
|
||||
def _parse_urlencoded(self, urlencoded):
|
||||
return {
|
||||
urldecode(key): urldecode(value) for key, value in [
|
||||
pair.split('=', 1) for pair in
|
||||
urlencoded.split('&')]}
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
if self.content_type != 'application/json':
|
||||
return None
|
||||
if self._json is None:
|
||||
self._json = json.loads(self.body.decode())
|
||||
return self._json
|
||||
|
||||
@property
|
||||
def form(self):
|
||||
if self.content_type != 'application/x-www-form-urlencoded':
|
||||
return None
|
||||
if self._form is None:
|
||||
self._form = self._parse_urlencoded(self.body.decode())
|
||||
return self._form
|
||||
|
||||
|
||||
class Response():
|
||||
types_map = {
|
||||
'css': 'text/css',
|
||||
'gif': 'image/gif',
|
||||
'html': 'text/html',
|
||||
'jpg': 'image/jpeg',
|
||||
'js': 'application/javascript',
|
||||
'json': 'application/json',
|
||||
'png': 'image/png',
|
||||
'txt': 'text/plain',
|
||||
}
|
||||
|
||||
def __init__(self, body='', status_code=200, headers=None):
|
||||
self.status_code = status_code
|
||||
self.headers = headers or {}
|
||||
if isinstance(body, (dict, list)):
|
||||
self.body = json.dumps(body).encode()
|
||||
self.headers['Content-Type'] = 'application/json'
|
||||
elif isinstance(body, str):
|
||||
self.body = body.encode()
|
||||
elif isinstance(body, bytes):
|
||||
self.body = body
|
||||
else:
|
||||
self.body = str(body).encode()
|
||||
|
||||
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
|
||||
max_age=None, secure=False, http_only=False):
|
||||
http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
|
||||
if path:
|
||||
http_cookie += '; Path=' + path
|
||||
if domain:
|
||||
http_cookie += '; Domain=' + domain
|
||||
if expires:
|
||||
http_cookie += '; Expires=' + expires.strftime(
|
||||
"%a, %d %b %Y %H:%M:%S GMT")
|
||||
if max_age:
|
||||
http_cookie += '; Max-Age=' + str(max_age)
|
||||
if secure:
|
||||
http_cookie += '; Secure'
|
||||
if http_only:
|
||||
http_cookie += '; HttpOnly'
|
||||
if 'Set-Cookie' in self.headers:
|
||||
self.headers['Set-Cookie'].append(http_cookie)
|
||||
else:
|
||||
self.headers['Set-Cookie'] = [http_cookie]
|
||||
|
||||
def complete(self):
|
||||
if 'Content-Length' not in self.headers:
|
||||
self.headers['Content-Length'] = str(len(self.body))
|
||||
if 'Content-Type' not in self.headers:
|
||||
self.headers['Content-Type'] = 'text/plain'
|
||||
|
||||
def write(self, stream):
|
||||
self.complete()
|
||||
|
||||
# status code
|
||||
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
|
||||
status_code=self.status_code,
|
||||
reason='OK' if self.status_code == 200 else 'N/A').encode())
|
||||
|
||||
# headers
|
||||
for header, value in self.headers.items():
|
||||
values = value if isinstance(value, list) else [value]
|
||||
for value in values:
|
||||
stream.write('{header}: {value}\r\n'.format(
|
||||
header=header, value=value).encode())
|
||||
stream.write(b'\r\n')
|
||||
|
||||
# body
|
||||
if self.body:
|
||||
stream.write(self.body)
|
||||
|
||||
@classmethod
|
||||
def redirect(cls, location, status_code=302):
|
||||
return cls(status_code=status_code, headers={'Location': location})
|
||||
|
||||
@classmethod
|
||||
def send_file(cls, filename, status_code=200, content_type=None):
|
||||
if content_type is None:
|
||||
ext = filename.split('.')[-1]
|
||||
if ext in Response.types_map:
|
||||
content_type = Response.types_map[ext]
|
||||
else:
|
||||
content_type = 'application/octet-stream'
|
||||
with open(filename) as f:
|
||||
body = f.read()
|
||||
return cls(body=body, status_code=status_code,
|
||||
headers={'Content-Type': content_type,
|
||||
'Content-Length': str(len(body))})
|
||||
|
||||
|
||||
class URLPattern():
|
||||
def __init__(self, url_pattern):
|
||||
self.pattern = ''
|
||||
self.args = []
|
||||
use_regex = False
|
||||
for segment in url_pattern.lstrip('/').split('/'):
|
||||
if segment and segment[0] == '<':
|
||||
if segment[-1] != '>':
|
||||
raise ValueError('invalid URL pattern')
|
||||
segment = segment[1:-1]
|
||||
if ':' in segment:
|
||||
type_, name = segment.rsplit(':', 1)
|
||||
else:
|
||||
type_ = 'string'
|
||||
name = segment
|
||||
if type_ == 'string':
|
||||
pattern = '[^/]+'
|
||||
elif type_ == 'int':
|
||||
pattern = '\\d+'
|
||||
elif type_ == 'path':
|
||||
pattern = '.+'
|
||||
elif type_.startswith('re:'):
|
||||
pattern = type_[3:]
|
||||
else:
|
||||
raise ValueError('invalid URL segment type')
|
||||
use_regex = True
|
||||
self.pattern += '/({pattern})'.format(pattern=pattern)
|
||||
self.args.append({'type': type_, 'name': name})
|
||||
else:
|
||||
self.pattern += '/{segment}'.format(segment=segment)
|
||||
if use_regex:
|
||||
self.pattern = re.compile('^' + self.pattern + '$')
|
||||
|
||||
def match(self, path):
|
||||
if isinstance(self.pattern, str):
|
||||
if path != self.pattern:
|
||||
return
|
||||
return {}
|
||||
g = self.pattern.match(path)
|
||||
if not g:
|
||||
return
|
||||
args = {}
|
||||
i = 1
|
||||
for arg in self.args:
|
||||
value = g.group(i)
|
||||
if arg['type'] == 'int':
|
||||
value = int(value)
|
||||
args[arg['name']] = value
|
||||
i += 1
|
||||
return args
|
||||
|
||||
|
||||
class Microdot():
|
||||
def __init__(self):
|
||||
self.url_map = []
|
||||
self.before_request_handlers = []
|
||||
self.after_request_handlers = []
|
||||
self.error_handlers = {}
|
||||
self.debug = False
|
||||
|
||||
def route(self, url_pattern, methods=None):
|
||||
def decorated(f):
|
||||
self.url_map.append(
|
||||
(methods or ['GET'], URLPattern(url_pattern), f))
|
||||
return f
|
||||
return decorated
|
||||
|
||||
def before_request(self, f):
|
||||
self.before_request_handlers.append(f)
|
||||
return f
|
||||
|
||||
def after_request(self, f):
|
||||
self.after_request_handlers.append(f)
|
||||
return f
|
||||
|
||||
def errorhandler(self, status_code_or_exception_class):
|
||||
def decorated(f):
|
||||
self.error_handlers[status_code_or_exception_class] = f
|
||||
return f
|
||||
return decorated
|
||||
|
||||
def run(self, host='0.0.0.0', port=5000, debug=False):
|
||||
self.debug = debug
|
||||
|
||||
s = socket.socket()
|
||||
ai = socket.getaddrinfo(host, port)
|
||||
addr = ai[0][-1]
|
||||
|
||||
if self.debug: # pragma: no cover
|
||||
print('Starting {mode} server on {host}:{port}...'.format(
|
||||
mode=concurrency_mode, host=host, port=port))
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(addr)
|
||||
s.listen(5)
|
||||
|
||||
while True:
|
||||
sock, addr = s.accept()
|
||||
create_thread(self.dispatch_request, sock, addr)
|
||||
|
||||
def find_route(self, req):
|
||||
f = None
|
||||
for route_methods, route_pattern, route_handler in self.url_map:
|
||||
if req.method in route_methods:
|
||||
req.url_args = route_pattern.match(req.path)
|
||||
if req.url_args is not None:
|
||||
f = route_handler
|
||||
break
|
||||
return f
|
||||
|
||||
def dispatch_request(self, sock, addr):
|
||||
if not hasattr(sock, 'readline'): # pragma: no cover
|
||||
stream = sock.makefile("rwb")
|
||||
else:
|
||||
stream = sock
|
||||
|
||||
req = Request.create(stream, addr)
|
||||
f = self.find_route(req)
|
||||
try:
|
||||
res = None
|
||||
if f:
|
||||
for handler in self.before_request_handlers:
|
||||
res = handler(req)
|
||||
if res:
|
||||
break
|
||||
if res is None:
|
||||
res = f(req, **req.url_args)
|
||||
if isinstance(res, tuple):
|
||||
res = Response(*res)
|
||||
elif not isinstance(res, Response):
|
||||
res = Response(res)
|
||||
for handler in self.after_request_handlers:
|
||||
res = handler(req, res) or res
|
||||
elif 404 in self.error_handlers:
|
||||
res = self.error_handlers[404](req)
|
||||
else:
|
||||
res = 'Not found', 404
|
||||
except Exception as exc:
|
||||
print_exception(exc)
|
||||
res = None
|
||||
if exc.__class__ in self.error_handlers:
|
||||
try:
|
||||
res = self.error_handlers[exc.__class__](req, exc)
|
||||
except Exception as exc2: # pragma: no cover
|
||||
print_exception(exc2)
|
||||
if res is None:
|
||||
if 500 in self.error_handlers:
|
||||
res = self.error_handlers[500](req)
|
||||
else:
|
||||
res = 'Internal server error', 500
|
||||
if isinstance(res, tuple):
|
||||
res = Response(*res)
|
||||
elif not isinstance(res, Response):
|
||||
res = Response(res)
|
||||
res.write(stream)
|
||||
stream.close()
|
||||
if stream != sock: # pragma: no cover
|
||||
sock.close()
|
||||
if self.debug: # pragma: no cover
|
||||
print('{method} {path} {status_code}'.format(
|
||||
method=req.method, path=req.path,
|
||||
status_code=res.status_code))
|
||||
|
||||
|
||||
redirect = Response.redirect
|
||||
send_file = Response.send_file
|
||||
@@ -2,26 +2,21 @@
|
||||
Microdot
|
||||
--------
|
||||
|
||||
Impossibly small web framework for MicroPython.
|
||||
The impossibly small web framework for MicroPython.
|
||||
"""
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='microdot',
|
||||
version='0.1.1',
|
||||
version="0.3.0",
|
||||
url='http://github.com/miguelgrinberg/microdot/',
|
||||
license='MIT',
|
||||
author='Miguel Grinberg',
|
||||
author_email='miguel.grinberg@gmail.com',
|
||||
description='Impossibly small web framework for MicroPython',
|
||||
description='The impossibly small web framework for MicroPython',
|
||||
long_description=__doc__,
|
||||
py_modules=['microdot'],
|
||||
zip_safe=False,
|
||||
include_package_data=True,
|
||||
platforms='any',
|
||||
tests_require=[
|
||||
'coverage'
|
||||
],
|
||||
classifiers=[
|
||||
'Environment :: Web Environment',
|
||||
'Intended Audience :: Developers',
|
||||
9
run_tests.py
Normal file
9
run_tests.py
Normal file
@@ -0,0 +1,9 @@
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, 'microdot')
|
||||
sys.path.insert(1, 'microdot-asyncio')
|
||||
sys.path.insert(2, 'tests/libs')
|
||||
|
||||
import unittest
|
||||
|
||||
unittest.main('tests')
|
||||
8
tests/__init__.py
Normal file
8
tests/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from tests.microdot.test_request import TestRequest
|
||||
from tests.microdot.test_response import TestResponse
|
||||
from tests.microdot.test_url_pattern import TestURLPattern
|
||||
from tests.microdot.test_microdot import TestMicrodot
|
||||
|
||||
from tests.microdot_asyncio.test_request_asyncio import TestRequestAsync
|
||||
from tests.microdot_asyncio.test_response_asyncio import TestResponseAsync
|
||||
from tests.microdot_asyncio.test_microdot_asyncio import TestMicrodotAsync
|
||||
1
tests/files/test.bin
Normal file
1
tests/files/test.bin
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.css
Normal file
1
tests/files/test.css
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.gif
Normal file
1
tests/files/test.gif
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.html
Normal file
1
tests/files/test.html
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.jpg
Normal file
1
tests/files/test.jpg
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.js
Normal file
1
tests/files/test.js
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.json
Normal file
1
tests/files/test.json
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.png
Normal file
1
tests/files/test.png
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
1
tests/files/test.txt
Normal file
1
tests/files/test.txt
Normal file
@@ -0,0 +1 @@
|
||||
foo
|
||||
2143
tests/libs/datetime.py
Normal file
2143
tests/libs/datetime.py
Normal file
File diff suppressed because it is too large
Load Diff
46
tests/libs/ffilib.py
Normal file
46
tests/libs/ffilib.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import sys
|
||||
try:
|
||||
import ffi
|
||||
except ImportError:
|
||||
ffi = None
|
||||
|
||||
_cache = {}
|
||||
|
||||
def open(name, maxver=10, extra=()):
|
||||
if not ffi:
|
||||
return None
|
||||
try:
|
||||
return _cache[name]
|
||||
except KeyError:
|
||||
pass
|
||||
def libs():
|
||||
if sys.platform == "linux":
|
||||
yield '%s.so' % name
|
||||
for i in range(maxver, -1, -1):
|
||||
yield '%s.so.%u' % (name, i)
|
||||
else:
|
||||
for ext in ('dylib', 'dll'):
|
||||
yield '%s.%s' % (name, ext)
|
||||
for n in extra:
|
||||
yield n
|
||||
err = None
|
||||
for n in libs():
|
||||
try:
|
||||
l = ffi.open(n)
|
||||
_cache[name] = l
|
||||
return l
|
||||
except OSError as e:
|
||||
err = e
|
||||
raise err
|
||||
|
||||
def libc():
|
||||
return open("libc", 6)
|
||||
|
||||
# Find out bitness of the platform, even if long ints are not supported
|
||||
# TODO: All bitness differences should be removed from micropython-lib, and
|
||||
# this snippet too.
|
||||
bitness = 1
|
||||
v = sys.maxsize
|
||||
while v:
|
||||
bitness += 1
|
||||
v >>= 1
|
||||
76
tests/libs/time.py
Normal file
76
tests/libs/time.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from utime import *
|
||||
from ucollections import namedtuple
|
||||
import ustruct
|
||||
import uctypes
|
||||
import ffi
|
||||
import ffilib
|
||||
import array
|
||||
|
||||
libc = ffilib.libc()
|
||||
|
||||
# struct tm *gmtime(const time_t *timep);
|
||||
# struct tm *localtime(const time_t *timep);
|
||||
# size_t strftime(char *s, size_t max, const char *format,
|
||||
# const struct tm *tm);
|
||||
gmtime_ = libc.func("P", "gmtime", "P")
|
||||
localtime_ = libc.func("P", "localtime", "P")
|
||||
strftime_ = libc.func("i", "strftime", "sisP")
|
||||
mktime_ = libc.func("i", "mktime", "P")
|
||||
|
||||
_struct_time = namedtuple("struct_time",
|
||||
["tm_year", "tm_mon", "tm_mday", "tm_hour", "tm_min", "tm_sec", "tm_wday", "tm_yday", "tm_isdst"])
|
||||
|
||||
def _tuple_to_c_tm(t):
|
||||
return ustruct.pack("@iiiiiiiii", t[5], t[4], t[3], t[2], t[1] - 1, t[0] - 1900, (t[6] + 1) % 7, t[7] - 1, t[8])
|
||||
|
||||
|
||||
def _c_tm_to_tuple(tm):
|
||||
t = ustruct.unpack("@iiiiiiiii", tm)
|
||||
return _struct_time(t[5] + 1900, t[4] + 1, t[3], t[2], t[1], t[0], (t[6] - 1) % 7, t[7] + 1, t[8])
|
||||
|
||||
def struct_time(tm):
|
||||
return _struct_time(*tm)
|
||||
|
||||
|
||||
def strftime(format, t=None):
|
||||
if t is None:
|
||||
t = localtime()
|
||||
|
||||
buf = bytearray(32)
|
||||
l = strftime_(buf, 32, format, _tuple_to_c_tm(t))
|
||||
return str(buf[:l], "utf-8")
|
||||
|
||||
|
||||
def localtime(t=None):
|
||||
if t is None:
|
||||
t = time()
|
||||
|
||||
t = int(t)
|
||||
a = ustruct.pack('l', t)
|
||||
tm_p = localtime_(a)
|
||||
return _c_tm_to_tuple(uctypes.bytearray_at(tm_p, 36))
|
||||
|
||||
|
||||
def gmtime(t=None):
|
||||
if t is None:
|
||||
t = time()
|
||||
|
||||
t = int(t)
|
||||
a = ustruct.pack('l', t)
|
||||
tm_p = gmtime_(a)
|
||||
return _c_tm_to_tuple(uctypes.bytearray_at(tm_p, 36))
|
||||
|
||||
|
||||
def mktime(tt):
|
||||
return mktime_(_tuple_to_c_tm(tt))
|
||||
|
||||
|
||||
def perf_counter():
|
||||
return time()
|
||||
|
||||
def process_time():
|
||||
return clock()
|
||||
|
||||
|
||||
daylight = 0
|
||||
timezone = 0
|
||||
258
tests/libs/uasyncio/__init__.py
Normal file
258
tests/libs/uasyncio/__init__.py
Normal file
@@ -0,0 +1,258 @@
|
||||
import uerrno
|
||||
import uselect as select
|
||||
import usocket as _socket
|
||||
from uasyncio.core import *
|
||||
|
||||
|
||||
DEBUG = 0
|
||||
log = None
|
||||
|
||||
def set_debug(val):
|
||||
global DEBUG, log
|
||||
DEBUG = val
|
||||
if val:
|
||||
import logging
|
||||
log = logging.getLogger("uasyncio")
|
||||
|
||||
|
||||
class PollEventLoop(EventLoop):
|
||||
|
||||
def __init__(self, runq_len=16, waitq_len=16):
|
||||
EventLoop.__init__(self, runq_len, waitq_len)
|
||||
self.poller = select.poll()
|
||||
self.objmap = {}
|
||||
|
||||
def add_reader(self, sock, cb, *args):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("add_reader%s", (sock, cb, args))
|
||||
if args:
|
||||
self.poller.register(sock, select.POLLIN)
|
||||
self.objmap[id(sock)] = (cb, args)
|
||||
else:
|
||||
self.poller.register(sock, select.POLLIN)
|
||||
self.objmap[id(sock)] = cb
|
||||
|
||||
def remove_reader(self, sock):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("remove_reader(%s)", sock)
|
||||
self.poller.unregister(sock)
|
||||
del self.objmap[id(sock)]
|
||||
|
||||
def add_writer(self, sock, cb, *args):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("add_writer%s", (sock, cb, args))
|
||||
if args:
|
||||
self.poller.register(sock, select.POLLOUT)
|
||||
self.objmap[id(sock)] = (cb, args)
|
||||
else:
|
||||
self.poller.register(sock, select.POLLOUT)
|
||||
self.objmap[id(sock)] = cb
|
||||
|
||||
def remove_writer(self, sock):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("remove_writer(%s)", sock)
|
||||
try:
|
||||
self.poller.unregister(sock)
|
||||
self.objmap.pop(id(sock), None)
|
||||
except OSError as e:
|
||||
# StreamWriter.awrite() first tries to write to a socket,
|
||||
# and if that succeeds, yield IOWrite may never be called
|
||||
# for that socket, and it will never be added to poller. So,
|
||||
# ignore such error.
|
||||
if e.args[0] != uerrno.ENOENT:
|
||||
raise
|
||||
|
||||
def wait(self, delay):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("poll.wait(%d)", delay)
|
||||
# We need one-shot behavior (second arg of 1 to .poll())
|
||||
res = self.poller.ipoll(delay, 1)
|
||||
#log.debug("poll result: %s", res)
|
||||
# Remove "if res" workaround after
|
||||
# https://github.com/micropython/micropython/issues/2716 fixed.
|
||||
if res:
|
||||
for sock, ev in res:
|
||||
cb = self.objmap[id(sock)]
|
||||
if ev & (select.POLLHUP | select.POLLERR):
|
||||
# These events are returned even if not requested, and
|
||||
# are sticky, i.e. will be returned again and again.
|
||||
# If the caller doesn't do proper error handling and
|
||||
# unregister this sock, we'll busy-loop on it, so we
|
||||
# as well can unregister it now "just in case".
|
||||
self.remove_reader(sock)
|
||||
if DEBUG and __debug__:
|
||||
log.debug("Calling IO callback: %r", cb)
|
||||
if isinstance(cb, tuple):
|
||||
cb[0](*cb[1])
|
||||
else:
|
||||
cb.pend_throw(None)
|
||||
self.call_soon(cb)
|
||||
|
||||
|
||||
class StreamReader:
|
||||
|
||||
def __init__(self, polls, ios=None):
|
||||
if ios is None:
|
||||
ios = polls
|
||||
self.polls = polls
|
||||
self.ios = ios
|
||||
|
||||
def read(self, n=-1):
|
||||
while True:
|
||||
yield IORead(self.polls)
|
||||
res = self.ios.read(n)
|
||||
if res is not None:
|
||||
break
|
||||
# This should not happen for real sockets, but can easily
|
||||
# happen for stream wrappers (ssl, websockets, etc.)
|
||||
#log.warn("Empty read")
|
||||
if not res:
|
||||
yield IOReadDone(self.polls)
|
||||
return res
|
||||
|
||||
def readexactly(self, n):
|
||||
buf = b""
|
||||
while n:
|
||||
yield IORead(self.polls)
|
||||
res = self.ios.read(n)
|
||||
assert res is not None
|
||||
if not res:
|
||||
yield IOReadDone(self.polls)
|
||||
break
|
||||
buf += res
|
||||
n -= len(res)
|
||||
return buf
|
||||
|
||||
def readline(self):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamReader.readline()")
|
||||
buf = b""
|
||||
while True:
|
||||
yield IORead(self.polls)
|
||||
res = self.ios.readline()
|
||||
assert res is not None
|
||||
if not res:
|
||||
yield IOReadDone(self.polls)
|
||||
break
|
||||
buf += res
|
||||
if buf[-1] == 0x0a:
|
||||
break
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamReader.readline(): %s", buf)
|
||||
return buf
|
||||
|
||||
def aclose(self):
|
||||
yield IOReadDone(self.polls)
|
||||
self.ios.close()
|
||||
|
||||
def __repr__(self):
|
||||
return "<StreamReader %r %r>" % (self.polls, self.ios)
|
||||
|
||||
|
||||
class StreamWriter:
|
||||
|
||||
def __init__(self, s, extra):
|
||||
self.s = s
|
||||
self.extra = extra
|
||||
|
||||
def awrite(self, buf, off=0, sz=-1):
|
||||
# This method is called awrite (async write) to not proliferate
|
||||
# incompatibility with original asyncio. Unlike original asyncio
|
||||
# whose .write() method is both not a coroutine and guaranteed
|
||||
# to return immediately (which means it has to buffer all the
|
||||
# data), this method is a coroutine.
|
||||
if sz == -1:
|
||||
sz = len(buf) - off
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamWriter.awrite(): spooling %d bytes", sz)
|
||||
while True:
|
||||
res = self.s.write(buf, off, sz)
|
||||
# If we spooled everything, return immediately
|
||||
if res == sz:
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamWriter.awrite(): completed spooling %d bytes", res)
|
||||
return
|
||||
if res is None:
|
||||
res = 0
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamWriter.awrite(): spooled partial %d bytes", res)
|
||||
assert res < sz
|
||||
off += res
|
||||
sz -= res
|
||||
yield IOWrite(self.s)
|
||||
#assert s2.fileno() == self.s.fileno()
|
||||
if DEBUG and __debug__:
|
||||
log.debug("StreamWriter.awrite(): can write more")
|
||||
|
||||
# Write piecewise content from iterable (usually, a generator)
|
||||
def awriteiter(self, iterable):
|
||||
for buf in iterable:
|
||||
yield from self.awrite(buf)
|
||||
|
||||
def aclose(self):
|
||||
yield IOWriteDone(self.s)
|
||||
self.s.close()
|
||||
|
||||
def get_extra_info(self, name, default=None):
|
||||
return self.extra.get(name, default)
|
||||
|
||||
def __repr__(self):
|
||||
return "<StreamWriter %r>" % self.s
|
||||
|
||||
|
||||
def open_connection(host, port, ssl=False):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("open_connection(%s, %s)", host, port)
|
||||
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
|
||||
ai = ai[0]
|
||||
s = _socket.socket(ai[0], ai[1], ai[2])
|
||||
s.setblocking(False)
|
||||
try:
|
||||
s.connect(ai[-1])
|
||||
except OSError as e:
|
||||
if e.args[0] != uerrno.EINPROGRESS:
|
||||
raise
|
||||
if DEBUG and __debug__:
|
||||
log.debug("open_connection: After connect")
|
||||
yield IOWrite(s)
|
||||
# if __debug__:
|
||||
# assert s2.fileno() == s.fileno()
|
||||
if DEBUG and __debug__:
|
||||
log.debug("open_connection: After iowait: %s", s)
|
||||
if ssl:
|
||||
print("Warning: uasyncio SSL support is alpha")
|
||||
import ussl
|
||||
s.setblocking(True)
|
||||
s2 = ussl.wrap_socket(s)
|
||||
s.setblocking(False)
|
||||
return StreamReader(s, s2), StreamWriter(s2, {})
|
||||
return StreamReader(s), StreamWriter(s, {})
|
||||
|
||||
|
||||
def start_server(client_coro, host, port, backlog=10):
|
||||
if DEBUG and __debug__:
|
||||
log.debug("start_server(%s, %s)", host, port)
|
||||
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
|
||||
ai = ai[0]
|
||||
s = _socket.socket(ai[0], ai[1], ai[2])
|
||||
s.setblocking(False)
|
||||
|
||||
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
|
||||
s.bind(ai[-1])
|
||||
s.listen(backlog)
|
||||
while True:
|
||||
if DEBUG and __debug__:
|
||||
log.debug("start_server: Before accept")
|
||||
yield IORead(s)
|
||||
if DEBUG and __debug__:
|
||||
log.debug("start_server: After iowait")
|
||||
s2, client_addr = s.accept()
|
||||
s2.setblocking(False)
|
||||
if DEBUG and __debug__:
|
||||
log.debug("start_server: After accept: %s", s2)
|
||||
extra = {"peername": client_addr}
|
||||
yield client_coro(StreamReader(s2), StreamWriter(s2, extra))
|
||||
|
||||
|
||||
import uasyncio.core
|
||||
uasyncio.core._event_loop_class = PollEventLoop
|
||||
318
tests/libs/uasyncio/core.py
Normal file
318
tests/libs/uasyncio/core.py
Normal file
@@ -0,0 +1,318 @@
|
||||
import utime as time
|
||||
import utimeq
|
||||
import ucollections
|
||||
|
||||
|
||||
type_gen = type((lambda: (yield))())
|
||||
|
||||
DEBUG = 0
|
||||
log = None
|
||||
|
||||
def set_debug(val):
|
||||
global DEBUG, log
|
||||
DEBUG = val
|
||||
if val:
|
||||
import logging
|
||||
log = logging.getLogger("uasyncio.core")
|
||||
|
||||
|
||||
class CancelledError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TimeoutError(CancelledError):
|
||||
pass
|
||||
|
||||
|
||||
class EventLoop:
|
||||
|
||||
def __init__(self, runq_len=16, waitq_len=16):
|
||||
self.runq = ucollections.deque((), runq_len, True)
|
||||
self.waitq = utimeq.utimeq(waitq_len)
|
||||
# Current task being run. Task is a top-level coroutine scheduled
|
||||
# in the event loop (sub-coroutines executed transparently by
|
||||
# yield from/await, event loop "doesn't see" them).
|
||||
self.cur_task = None
|
||||
|
||||
def time(self):
|
||||
return time.ticks_ms()
|
||||
|
||||
def create_task(self, coro):
|
||||
# CPython 3.4.2
|
||||
self.call_later_ms(0, coro)
|
||||
# CPython asyncio incompatibility: we don't return Task object
|
||||
|
||||
def call_soon(self, callback, *args):
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Scheduling in runq: %s", (callback, args))
|
||||
self.runq.append(callback)
|
||||
if not isinstance(callback, type_gen):
|
||||
self.runq.append(args)
|
||||
|
||||
def call_later(self, delay, callback, *args):
|
||||
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
|
||||
|
||||
def call_later_ms(self, delay, callback, *args):
|
||||
if not delay:
|
||||
return self.call_soon(callback, *args)
|
||||
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
|
||||
|
||||
def call_at_(self, time, callback, args=()):
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Scheduling in waitq: %s", (time, callback, args))
|
||||
self.waitq.push(time, callback, args)
|
||||
|
||||
def wait(self, delay):
|
||||
# Default wait implementation, to be overriden in subclasses
|
||||
# with IO scheduling
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Sleeping for: %s", delay)
|
||||
time.sleep_ms(delay)
|
||||
|
||||
def run_forever(self):
|
||||
cur_task = [0, 0, 0]
|
||||
while True:
|
||||
# Expire entries in waitq and move them to runq
|
||||
tnow = self.time()
|
||||
while self.waitq:
|
||||
t = self.waitq.peektime()
|
||||
delay = time.ticks_diff(t, tnow)
|
||||
if delay > 0:
|
||||
break
|
||||
self.waitq.pop(cur_task)
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Moving from waitq to runq: %s", cur_task[1])
|
||||
self.call_soon(cur_task[1], *cur_task[2])
|
||||
|
||||
# Process runq
|
||||
l = len(self.runq)
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Entries in runq: %d", l)
|
||||
while l:
|
||||
cb = self.runq.popleft()
|
||||
l -= 1
|
||||
args = ()
|
||||
if not isinstance(cb, type_gen):
|
||||
args = self.runq.popleft()
|
||||
l -= 1
|
||||
if __debug__ and DEBUG:
|
||||
log.info("Next callback to run: %s", (cb, args))
|
||||
cb(*args)
|
||||
continue
|
||||
|
||||
if __debug__ and DEBUG:
|
||||
log.info("Next coroutine to run: %s", (cb, args))
|
||||
self.cur_task = cb
|
||||
delay = 0
|
||||
try:
|
||||
if args is ():
|
||||
ret = next(cb)
|
||||
else:
|
||||
ret = cb.send(*args)
|
||||
if __debug__ and DEBUG:
|
||||
log.info("Coroutine %s yield result: %s", cb, ret)
|
||||
if isinstance(ret, SysCall1):
|
||||
arg = ret.arg
|
||||
if isinstance(ret, SleepMs):
|
||||
delay = arg
|
||||
elif isinstance(ret, IORead):
|
||||
cb.pend_throw(False)
|
||||
self.add_reader(arg, cb)
|
||||
continue
|
||||
elif isinstance(ret, IOWrite):
|
||||
cb.pend_throw(False)
|
||||
self.add_writer(arg, cb)
|
||||
continue
|
||||
elif isinstance(ret, IOReadDone):
|
||||
self.remove_reader(arg)
|
||||
elif isinstance(ret, IOWriteDone):
|
||||
self.remove_writer(arg)
|
||||
elif isinstance(ret, StopLoop):
|
||||
return arg
|
||||
else:
|
||||
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
|
||||
elif isinstance(ret, type_gen):
|
||||
self.call_soon(ret)
|
||||
elif isinstance(ret, int):
|
||||
# Delay
|
||||
delay = ret
|
||||
elif ret is None:
|
||||
# Just reschedule
|
||||
pass
|
||||
elif ret is False:
|
||||
# Don't reschedule
|
||||
continue
|
||||
else:
|
||||
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
|
||||
except StopIteration as e:
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Coroutine finished: %s", cb)
|
||||
continue
|
||||
except CancelledError as e:
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("Coroutine cancelled: %s", cb)
|
||||
continue
|
||||
# Currently all syscalls don't return anything, so we don't
|
||||
# need to feed anything to the next invocation of coroutine.
|
||||
# If that changes, need to pass that value below.
|
||||
if delay:
|
||||
self.call_later_ms(delay, cb)
|
||||
else:
|
||||
self.call_soon(cb)
|
||||
|
||||
# Wait until next waitq task or I/O availability
|
||||
delay = 0
|
||||
if not self.runq:
|
||||
delay = -1
|
||||
if self.waitq:
|
||||
tnow = self.time()
|
||||
t = self.waitq.peektime()
|
||||
delay = time.ticks_diff(t, tnow)
|
||||
if delay < 0:
|
||||
delay = 0
|
||||
self.wait(delay)
|
||||
|
||||
def run_until_complete(self, coro):
|
||||
ret = None
|
||||
def _run_and_stop():
|
||||
nonlocal ret
|
||||
ret = yield from coro
|
||||
yield StopLoop(0)
|
||||
self.call_soon(_run_and_stop())
|
||||
self.run_forever()
|
||||
return ret
|
||||
|
||||
def stop(self):
|
||||
self.call_soon((lambda: (yield StopLoop(0)))())
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class SysCall:
|
||||
|
||||
def __init__(self, *args):
|
||||
self.args = args
|
||||
|
||||
def handle(self):
|
||||
raise NotImplementedError
|
||||
|
||||
# Optimized syscall with 1 arg
|
||||
class SysCall1(SysCall):
|
||||
|
||||
def __init__(self, arg):
|
||||
self.arg = arg
|
||||
|
||||
class StopLoop(SysCall1):
|
||||
pass
|
||||
|
||||
class IORead(SysCall1):
|
||||
pass
|
||||
|
||||
class IOWrite(SysCall1):
|
||||
pass
|
||||
|
||||
class IOReadDone(SysCall1):
|
||||
pass
|
||||
|
||||
class IOWriteDone(SysCall1):
|
||||
pass
|
||||
|
||||
|
||||
_event_loop = None
|
||||
_event_loop_class = EventLoop
|
||||
def get_event_loop(runq_len=16, waitq_len=16):
|
||||
global _event_loop
|
||||
if _event_loop is None:
|
||||
_event_loop = _event_loop_class(runq_len, waitq_len)
|
||||
return _event_loop
|
||||
|
||||
def sleep(secs):
|
||||
yield int(secs * 1000)
|
||||
|
||||
# Implementation of sleep_ms awaitable with zero heap memory usage
|
||||
class SleepMs(SysCall1):
|
||||
|
||||
def __init__(self):
|
||||
self.v = None
|
||||
self.arg = None
|
||||
|
||||
def __call__(self, arg):
|
||||
self.v = arg
|
||||
#print("__call__")
|
||||
return self
|
||||
|
||||
def __iter__(self):
|
||||
#print("__iter__")
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if self.v is not None:
|
||||
#print("__next__ syscall enter")
|
||||
self.arg = self.v
|
||||
self.v = None
|
||||
return self
|
||||
#print("__next__ syscall exit")
|
||||
_stop_iter.__traceback__ = None
|
||||
raise _stop_iter
|
||||
|
||||
_stop_iter = StopIteration()
|
||||
sleep_ms = SleepMs()
|
||||
|
||||
|
||||
def cancel(coro):
|
||||
prev = coro.pend_throw(CancelledError())
|
||||
if prev is False:
|
||||
_event_loop.call_soon(coro)
|
||||
|
||||
|
||||
class TimeoutObj:
|
||||
def __init__(self, coro):
|
||||
self.coro = coro
|
||||
|
||||
|
||||
def wait_for_ms(coro, timeout):
|
||||
|
||||
def waiter(coro, timeout_obj):
|
||||
res = yield from coro
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("waiter: cancelling %s", timeout_obj)
|
||||
timeout_obj.coro = None
|
||||
return res
|
||||
|
||||
def timeout_func(timeout_obj):
|
||||
if timeout_obj.coro:
|
||||
if __debug__ and DEBUG:
|
||||
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
|
||||
prev = timeout_obj.coro.pend_throw(TimeoutError())
|
||||
#print("prev pend", prev)
|
||||
if prev is False:
|
||||
_event_loop.call_soon(timeout_obj.coro)
|
||||
|
||||
timeout_obj = TimeoutObj(_event_loop.cur_task)
|
||||
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
|
||||
return (yield from waiter(coro, timeout_obj))
|
||||
|
||||
|
||||
def wait_for(coro, timeout):
|
||||
return wait_for_ms(coro, int(timeout * 1000))
|
||||
|
||||
|
||||
def coroutine(f):
|
||||
return f
|
||||
|
||||
#
|
||||
# The functions below are deprecated in uasyncio, and provided only
|
||||
# for compatibility with CPython asyncio
|
||||
#
|
||||
|
||||
def ensure_future(coro, loop=_event_loop):
|
||||
_event_loop.call_soon(coro)
|
||||
# CPython asyncio incompatibility: we don't return Task object
|
||||
return coro
|
||||
|
||||
|
||||
# CPython asyncio incompatibility: Task is a function, not a class (for efficiency)
|
||||
def Task(coro, loop=_event_loop):
|
||||
# Same as async()
|
||||
_event_loop.call_soon(coro)
|
||||
224
tests/libs/unittest.py
Normal file
224
tests/libs/unittest.py
Normal file
@@ -0,0 +1,224 @@
|
||||
import sys
|
||||
|
||||
|
||||
class SkipTest(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AssertRaisesContext:
|
||||
|
||||
def __init__(self, exc):
|
||||
self.expected = exc
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
if exc_type is None:
|
||||
assert False, "%r not raised" % self.expected
|
||||
if issubclass(exc_type, self.expected):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class TestCase:
|
||||
|
||||
def fail(self, msg=''):
|
||||
assert False, msg
|
||||
|
||||
def assertEqual(self, x, y, msg=''):
|
||||
if not msg:
|
||||
msg = "%r vs (expected) %r" % (x, y)
|
||||
assert x == y, msg
|
||||
|
||||
def assertNotEqual(self, x, y, msg=''):
|
||||
if not msg:
|
||||
msg = "%r not expected to be equal %r" % (x, y)
|
||||
assert x != y, msg
|
||||
|
||||
def assertAlmostEqual(self, x, y, places=None, msg='', delta=None):
|
||||
if x == y:
|
||||
return
|
||||
if delta is not None and places is not None:
|
||||
raise TypeError("specify delta or places not both")
|
||||
|
||||
if delta is not None:
|
||||
if abs(x - y) <= delta:
|
||||
return
|
||||
if not msg:
|
||||
msg = '%r != %r within %r delta' % (x, y, delta)
|
||||
else:
|
||||
if places is None:
|
||||
places = 7
|
||||
if round(abs(y-x), places) == 0:
|
||||
return
|
||||
if not msg:
|
||||
msg = '%r != %r within %r places' % (x, y, places)
|
||||
|
||||
assert False, msg
|
||||
|
||||
def assertNotAlmostEqual(self, x, y, places=None, msg='', delta=None):
|
||||
if delta is not None and places is not None:
|
||||
raise TypeError("specify delta or places not both")
|
||||
|
||||
if delta is not None:
|
||||
if not (x == y) and abs(x - y) > delta:
|
||||
return
|
||||
if not msg:
|
||||
msg = '%r == %r within %r delta' % (x, y, delta)
|
||||
else:
|
||||
if places is None:
|
||||
places = 7
|
||||
if not (x == y) and round(abs(y-x), places) != 0:
|
||||
return
|
||||
if not msg:
|
||||
msg = '%r == %r within %r places' % (x, y, places)
|
||||
|
||||
assert False, msg
|
||||
|
||||
def assertIs(self, x, y, msg=''):
|
||||
if not msg:
|
||||
msg = "%r is not %r" % (x, y)
|
||||
assert x is y, msg
|
||||
|
||||
def assertIsNot(self, x, y, msg=''):
|
||||
if not msg:
|
||||
msg = "%r is %r" % (x, y)
|
||||
assert x is not y, msg
|
||||
|
||||
def assertIsNone(self, x, msg=''):
|
||||
if not msg:
|
||||
msg = "%r is not None" % x
|
||||
assert x is None, msg
|
||||
|
||||
def assertIsNotNone(self, x, msg=''):
|
||||
if not msg:
|
||||
msg = "%r is None" % x
|
||||
assert x is not None, msg
|
||||
|
||||
def assertTrue(self, x, msg=''):
|
||||
if not msg:
|
||||
msg = "Expected %r to be True" % x
|
||||
assert x, msg
|
||||
|
||||
def assertFalse(self, x, msg=''):
|
||||
if not msg:
|
||||
msg = "Expected %r to be False" % x
|
||||
assert not x, msg
|
||||
|
||||
def assertIn(self, x, y, msg=''):
|
||||
if not msg:
|
||||
msg = "Expected %r to be in %r" % (x, y)
|
||||
assert x in y, msg
|
||||
|
||||
def assertIsInstance(self, x, y, msg=''):
|
||||
assert isinstance(x, y), msg
|
||||
|
||||
def assertRaises(self, exc, func=None, *args, **kwargs):
|
||||
if func is None:
|
||||
return AssertRaisesContext(exc)
|
||||
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
assert False, "%r not raised" % exc
|
||||
except Exception as e:
|
||||
if isinstance(e, exc):
|
||||
return
|
||||
raise
|
||||
|
||||
|
||||
|
||||
def skip(msg):
|
||||
def _decor(fun):
|
||||
# We just replace original fun with _inner
|
||||
def _inner(self):
|
||||
raise SkipTest(msg)
|
||||
return _inner
|
||||
return _decor
|
||||
|
||||
def skipIf(cond, msg):
|
||||
if not cond:
|
||||
return lambda x: x
|
||||
return skip(msg)
|
||||
|
||||
def skipUnless(cond, msg):
|
||||
if cond:
|
||||
return lambda x: x
|
||||
return skip(msg)
|
||||
|
||||
|
||||
class TestSuite:
|
||||
def __init__(self):
|
||||
self.tests = []
|
||||
def addTest(self, cls):
|
||||
self.tests.append(cls)
|
||||
|
||||
class TestRunner:
|
||||
def run(self, suite):
|
||||
res = TestResult()
|
||||
for c in suite.tests:
|
||||
run_class(c, res)
|
||||
|
||||
print("Ran %d tests\n" % res.testsRun)
|
||||
if res.failuresNum > 0 or res.errorsNum > 0:
|
||||
print("FAILED (failures=%d, errors=%d)" % (res.failuresNum, res.errorsNum))
|
||||
else:
|
||||
msg = "OK"
|
||||
if res.skippedNum > 0:
|
||||
msg += " (%d skipped)" % res.skippedNum
|
||||
print(msg)
|
||||
|
||||
return res
|
||||
|
||||
class TestResult:
|
||||
def __init__(self):
|
||||
self.errorsNum = 0
|
||||
self.failuresNum = 0
|
||||
self.skippedNum = 0
|
||||
self.testsRun = 0
|
||||
|
||||
def wasSuccessful(self):
|
||||
return self.errorsNum == 0 and self.failuresNum == 0
|
||||
|
||||
# TODO: Uncompliant
|
||||
def run_class(c, test_result):
|
||||
o = c()
|
||||
set_up = getattr(o, "setUp", lambda: None)
|
||||
tear_down = getattr(o, "tearDown", lambda: None)
|
||||
for name in dir(o):
|
||||
if name.startswith("test"):
|
||||
print("%s (%s) ..." % (name, c.__qualname__), end="")
|
||||
m = getattr(o, name)
|
||||
set_up()
|
||||
try:
|
||||
test_result.testsRun += 1
|
||||
m()
|
||||
print(" ok")
|
||||
except SkipTest as e:
|
||||
print(" skipped:", e.args[0])
|
||||
test_result.skippedNum += 1
|
||||
except:
|
||||
print(" FAIL")
|
||||
test_result.failuresNum += 1
|
||||
# Uncomment to investigate failure in detail
|
||||
#raise
|
||||
continue
|
||||
finally:
|
||||
tear_down()
|
||||
|
||||
|
||||
def main(module="__main__"):
|
||||
def test_cases(m):
|
||||
for tn in dir(m):
|
||||
c = getattr(m, tn)
|
||||
if isinstance(c, object) and isinstance(c, type) and issubclass(c, TestCase):
|
||||
yield c
|
||||
|
||||
m = __import__(module)
|
||||
suite = TestSuite()
|
||||
for c in test_cases(m):
|
||||
suite.addTest(c)
|
||||
runner = TestRunner()
|
||||
result = runner.run(suite)
|
||||
# Terminate with non zero return code in case of failures
|
||||
sys.exit(result.failuresNum > 0)
|
||||
0
tests/microdot/__init__.py
Normal file
0
tests/microdot/__init__.py
Normal file
189
tests/microdot/test_microdot.py
Normal file
189
tests/microdot/test_microdot.py
Normal file
@@ -0,0 +1,189 @@
|
||||
import sys
|
||||
import unittest
|
||||
from microdot import Microdot, Response
|
||||
from tests import mock_socket
|
||||
|
||||
|
||||
def mock_create_thread(f, *args, **kwargs):
|
||||
f(*args, **kwargs)
|
||||
|
||||
|
||||
class TestMicrodot(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# mock socket module
|
||||
self.original_socket = sys.modules['microdot'].socket
|
||||
self.original_create_thread = sys.modules['microdot'].create_thread
|
||||
sys.modules['microdot'].socket = mock_socket
|
||||
sys.modules['microdot'].create_thread = mock_create_thread
|
||||
|
||||
def tearDown(self):
|
||||
# restore original socket module
|
||||
sys.modules['microdot'].socket = self.original_socket
|
||||
sys.modules['microdot'].create_thread = self.original_create_thread
|
||||
|
||||
def test_get_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_post_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def index_post(req):
|
||||
return Response('bar')
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('POST', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
|
||||
|
||||
def test_before_after_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.before_request
|
||||
def before_request(req):
|
||||
if req.path == '/bar':
|
||||
return 'bar', 202
|
||||
req.g.message = 'baz'
|
||||
|
||||
@app.after_request
|
||||
def after_request_one(req, res):
|
||||
res.headers['X-One'] = '1'
|
||||
|
||||
@app.after_request
|
||||
def after_request_two(req, res):
|
||||
res.set_cookie('foo', 'bar')
|
||||
return res
|
||||
|
||||
@app.route('/bar')
|
||||
def bar(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/baz')
|
||||
def baz(req):
|
||||
return req.g.message
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/bar')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
|
||||
self.assertIn(b'X-One: 1\r\n', fd.response)
|
||||
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/baz')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'X-One: 1\r\n', fd.response)
|
||||
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbaz'))
|
||||
|
||||
def test_404(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 9\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nNot found'))
|
||||
|
||||
def test_404_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.errorhandler(404)
|
||||
def handle_404(req):
|
||||
return '404'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
|
||||
|
||||
def test_500(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 21\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nInternal server error'))
|
||||
|
||||
def test_500_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(500)
|
||||
def handle_500(req):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
|
||||
|
||||
def test_exception_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(ZeroDivisionError)
|
||||
def handle_div_zero(req, exc):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
|
||||
77
tests/microdot/test_request.py
Normal file
77
tests/microdot/test_request.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import unittest
|
||||
from microdot import Request
|
||||
from tests.mock_socket import get_request_fd
|
||||
|
||||
|
||||
class TestRequest(unittest.TestCase):
|
||||
def test_create_request(self):
|
||||
fd = get_request_fd('GET', '/foo')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertEqual(req.client_addr, 'addr')
|
||||
self.assertEqual(req.method, 'GET')
|
||||
self.assertEqual(req.path, '/foo')
|
||||
self.assertEqual(req.http_version, '1.0')
|
||||
self.assertIsNone(req.query_string)
|
||||
self.assertEqual(req.args, {})
|
||||
self.assertEqual(req.headers, {'Host': 'example.com:1234'})
|
||||
self.assertEqual(req.cookies, {})
|
||||
self.assertEqual(req.content_length, 0)
|
||||
self.assertEqual(req.content_type, None)
|
||||
self.assertEqual(req.body, b'')
|
||||
self.assertEqual(req.json, None)
|
||||
self.assertEqual(req.form, None)
|
||||
|
||||
def test_headers(self):
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json',
|
||||
'Cookie': 'foo=bar;abc=def',
|
||||
'Content-Length': '3'}, body='aaa')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertEqual(req.headers, {
|
||||
'Host': 'example.com:1234',
|
||||
'Content-Type': 'application/json',
|
||||
'Cookie': 'foo=bar;abc=def',
|
||||
'Content-Length': '3'})
|
||||
self.assertEqual(req.content_type, 'application/json')
|
||||
self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'})
|
||||
self.assertEqual(req.content_length, 3)
|
||||
self.assertEqual(req.body, b'aaa')
|
||||
|
||||
def test_args(self):
|
||||
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
|
||||
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
|
||||
def test_json(self):
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
|
||||
req = Request.create(fd, 'addr')
|
||||
json = req.json
|
||||
self.assertEqual(json, {'foo': 'bar'})
|
||||
self.assertTrue(req.json is json)
|
||||
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='[1, "2"]')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertEqual(req.json, [1, '2'])
|
||||
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/xml'}, body='[1, "2"]')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertIsNone(req.json)
|
||||
|
||||
def test_form(self):
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = Request.create(fd, 'addr')
|
||||
form = req.form
|
||||
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertTrue(req.form is form)
|
||||
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = Request.create(fd, 'addr')
|
||||
self.assertIsNone(req.form)
|
||||
171
tests/microdot/test_response.py
Normal file
171
tests/microdot/test_response.py
Normal file
@@ -0,0 +1,171 @@
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
import uio as io
|
||||
except ImportError:
|
||||
import io
|
||||
|
||||
import unittest
|
||||
from microdot import Response
|
||||
|
||||
|
||||
class TestResponse(unittest.TestCase):
|
||||
def test_create_from_string(self):
|
||||
res = Response('foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_from_string_with_content_length(self):
|
||||
res = Response('foo', headers={'Content-Length': '2'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Length': '2'})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'Content-Length: 2\r\n', response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_from_bytes(self):
|
||||
res = Response(b'foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_empty(self):
|
||||
res = Response(headers={'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.body, b'')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'X-Foo: Bar\r\n', response)
|
||||
self.assertIn(b'Content-Length: 0\r\n', response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\n'))
|
||||
|
||||
def test_create_json(self):
|
||||
res = Response({'foo': 'bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
|
||||
self.assertEqual(res.body, b'{"foo": "bar"}')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'Content-Length: 14\r\n', response)
|
||||
self.assertIn(b'Content-Type: application/json\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\n{"foo": "bar"}'))
|
||||
|
||||
res = Response([1, '2'])
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
|
||||
self.assertEqual(res.body, b'[1, "2"]')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
response = fd.getvalue()
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', response)
|
||||
self.assertIn(b'Content-Length: 8\r\n', response)
|
||||
self.assertIn(b'Content-Type: application/json\r\n', response)
|
||||
self.assertTrue(response.endswith(b'\r\n\r\n[1, "2"]'))
|
||||
|
||||
def test_create_from_other(self):
|
||||
res = Response(123)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'123')
|
||||
|
||||
def test_create_with_status_code(self):
|
||||
res = Response('not found', 404)
|
||||
self.assertEqual(res.status_code, 404)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'not found')
|
||||
|
||||
def test_create_with_headers(self):
|
||||
res = Response('foo', headers={'X-Test': 'Foo'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'X-Test': 'Foo'})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
|
||||
def test_create_with_status_code_and_headers(self):
|
||||
res = Response('foo', 202, {'X-Test': 'Foo'})
|
||||
self.assertEqual(res.status_code, 202)
|
||||
self.assertEqual(res.headers, {'X-Test': 'Foo'})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
|
||||
def test_cookies(self):
|
||||
res = Response('ok')
|
||||
res.set_cookie('foo1', 'bar1')
|
||||
res.set_cookie('foo2', 'bar2', path='/')
|
||||
res.set_cookie('foo3', 'bar3', domain='example.com:1234')
|
||||
res.set_cookie('foo4', 'bar4',
|
||||
expires=datetime(2019, 11, 5, 2, 23, 54))
|
||||
res.set_cookie('foo5', 'bar5', max_age=123)
|
||||
res.set_cookie('foo6', 'bar6', secure=True, http_only=True)
|
||||
res.set_cookie('foo7', 'bar7', path='/foo', domain='example.com:1234',
|
||||
expires=datetime(2019, 11, 5, 2, 23, 54), max_age=123,
|
||||
secure=True, http_only=True)
|
||||
self.assertEqual(res.headers, {'Set-Cookie': [
|
||||
'foo1=bar1',
|
||||
'foo2=bar2; Path=/',
|
||||
'foo3=bar3; Domain=example.com:1234',
|
||||
'foo4=bar4; Expires=Tue, 05 Nov 2019 02:23:54 GMT',
|
||||
'foo5=bar5; Max-Age=123',
|
||||
'foo6=bar6; Secure; HttpOnly',
|
||||
'foo7=bar7; Path=/foo; Domain=example.com:1234; '
|
||||
'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; '
|
||||
'HttpOnly'
|
||||
]})
|
||||
|
||||
def test_redirect(self):
|
||||
res = Response.redirect('/foo')
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/foo')
|
||||
|
||||
res = Response.redirect('/foo', status_code=301)
|
||||
self.assertEqual(res.status_code, 301)
|
||||
self.assertEqual(res.headers['Location'], '/foo')
|
||||
|
||||
def test_send_file(self):
|
||||
files = [
|
||||
('test.txt', 'text/plain'),
|
||||
('test.gif', 'image/gif'),
|
||||
('test.jpg', 'image/jpeg'),
|
||||
('test.png', 'image/png'),
|
||||
('test.html', 'text/html'),
|
||||
('test.css', 'text/css'),
|
||||
('test.js', 'application/javascript'),
|
||||
('test.json', 'application/json'),
|
||||
('test.bin', 'application/octet-stream'),
|
||||
]
|
||||
for file, content_type in files:
|
||||
res = Response.send_file('tests/files/' + file)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'], content_type)
|
||||
self.assertEqual(res.headers['Content-Length'], '4')
|
||||
self.assertEqual(res.body, b'foo\n')
|
||||
res = Response.send_file('tests/files/test.txt',
|
||||
content_type='text/html')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'], 'text/html')
|
||||
self.assertEqual(res.headers['Content-Length'], '4')
|
||||
self.assertEqual(res.body, b'foo\n')
|
||||
97
tests/microdot/test_url_pattern.py
Normal file
97
tests/microdot/test_url_pattern.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import unittest
|
||||
from microdot import URLPattern
|
||||
|
||||
|
||||
class TestURLPattern(unittest.TestCase):
|
||||
def test_static(self):
|
||||
p = URLPattern('/')
|
||||
self.assertEqual(p.match('/'), {})
|
||||
self.assertIsNone(p.match('/foo'))
|
||||
|
||||
p = URLPattern('/foo/bar')
|
||||
self.assertEqual(p.match('/foo/bar'), {})
|
||||
self.assertIsNone(p.match('/foo'))
|
||||
self.assertIsNone(p.match('/foo/bar/'))
|
||||
|
||||
p = URLPattern('/foo//bar/baz/')
|
||||
self.assertEqual(p.match('/foo//bar/baz/'), {})
|
||||
self.assertIsNone(p.match('/foo/bar/baz/'))
|
||||
self.assertIsNone(p.match('/foo'))
|
||||
self.assertIsNone(p.match('/foo/bar/baz'))
|
||||
|
||||
def test_string_argument(self):
|
||||
p = URLPattern('/<arg>')
|
||||
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('/foo/'))
|
||||
|
||||
p = URLPattern('/<arg>/')
|
||||
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('/foo'))
|
||||
|
||||
p = URLPattern('/<string:arg>')
|
||||
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('/foo/'))
|
||||
|
||||
p = URLPattern('/<string:arg>/')
|
||||
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('/foo'))
|
||||
|
||||
p = URLPattern('/foo/<arg1>/bar/<arg2>')
|
||||
self.assertEqual(p.match('/foo/one/bar/two'),
|
||||
{'arg1': 'one', 'arg2': 'two'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('/foo/'))
|
||||
|
||||
def test_int_argument(self):
|
||||
p = URLPattern('/users/<int:id>')
|
||||
self.assertEqual(p.match('/users/123'), {'id': 123})
|
||||
self.assertIsNone(p.match('/users/'))
|
||||
self.assertIsNone(p.match('/users/abc'))
|
||||
self.assertIsNone(p.match('/users/123abc'))
|
||||
self.assertIsNone(p.match('/users/123/abc'))
|
||||
|
||||
p = URLPattern('/users/<int:id>/<int:id2>/')
|
||||
self.assertEqual(p.match('/users/123/456/'), {'id': 123, 'id2': 456})
|
||||
self.assertIsNone(p.match('/users/'))
|
||||
self.assertIsNone(p.match('/users/123/456'))
|
||||
self.assertIsNone(p.match('/users/123/abc/'))
|
||||
self.assertIsNone(p.match('/users/123/456/abc'))
|
||||
|
||||
def test_path_argument(self):
|
||||
p = URLPattern('/users/<path:path>')
|
||||
self.assertEqual(p.match('/users/123'), {'path': '123'})
|
||||
self.assertEqual(p.match('/users/123/'), {'path': '123/'})
|
||||
self.assertEqual(p.match('/users/abc/def'), {'path': 'abc/def'})
|
||||
self.assertIsNone(p.match('/users/'))
|
||||
|
||||
p = URLPattern('/users/<path:path>/foo')
|
||||
self.assertEqual(p.match('/users/123/foo'), {'path': '123'})
|
||||
self.assertEqual(p.match('/users/foo/foo'), {'path': 'foo'})
|
||||
self.assertEqual(p.match('/users/abc/def/foo'), {'path': 'abc/def'})
|
||||
self.assertIsNone(p.match('/users/'))
|
||||
self.assertIsNone(p.match('/users/foo'))
|
||||
self.assertIsNone(p.match('/users/foo/'))
|
||||
|
||||
def test_regex_argument(self):
|
||||
p = URLPattern('/users/<re:[a-c]+:id>')
|
||||
self.assertEqual(p.match('/users/ab'), {'id': 'ab'})
|
||||
self.assertEqual(p.match('/users/bca'), {'id': 'bca'})
|
||||
self.assertIsNone(p.match('/users/abcd'))
|
||||
|
||||
def test_many_arguments(self):
|
||||
p = URLPattern('/foo/<path:path>/<int:id>/bar/<name>')
|
||||
self.assertEqual(p.match('/foo/abc/def/123/bar/test'),
|
||||
{'path': 'abc/def', 'id': 123, 'name': 'test'})
|
||||
self.assertIsNone(p.match('/foo/123/bar/test'))
|
||||
self.assertIsNone(p.match('/foo/abc/def/ghi/bar/test'))
|
||||
self.assertIsNone(p.match('/foo/abc/def/123/bar'))
|
||||
self.assertIsNone(p.match('/foo/abc/def/123/bar/'))
|
||||
self.assertIsNone(p.match('/foo/abc/def/123/test'))
|
||||
|
||||
def test_invalid_url_patterns(self):
|
||||
self.assertRaises(ValueError, URLPattern, '/users/<foo/bar')
|
||||
self.assertRaises(ValueError, URLPattern, '/users/<badtype:id>')
|
||||
0
tests/microdot_asyncio/__init__.py
Normal file
0
tests/microdot_asyncio/__init__.py
Normal file
200
tests/microdot_asyncio/test_microdot_asyncio.py
Normal file
200
tests/microdot_asyncio/test_microdot_asyncio.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import sys
|
||||
import unittest
|
||||
from microdot_asyncio import Microdot, Response
|
||||
from tests import mock_asyncio, mock_socket
|
||||
|
||||
|
||||
class TestMicrodotAsync(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# mock socket module
|
||||
self.original_asyncio = sys.modules['microdot_asyncio'].asyncio
|
||||
sys.modules['microdot_asyncio'].asyncio = mock_asyncio
|
||||
|
||||
def tearDown(self):
|
||||
# restore original socket module
|
||||
sys.modules['microdot_asyncio'].asyncio = self.original_asyncio
|
||||
|
||||
def test_get_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/async')
|
||||
async def index2(req):
|
||||
return 'foo-async'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
fd2 = mock_socket.add_request('GET', '/async')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
|
||||
self.assertTrue(fd2.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 9\r\n', fd2.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
|
||||
self.assertTrue(fd2.response.endswith(b'\r\n\r\nfoo-async'))
|
||||
|
||||
def test_post_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def index_post(req):
|
||||
return Response('bar')
|
||||
|
||||
@app.route('/async', methods=['POST'])
|
||||
async def index_post2(req):
|
||||
return Response('bar-async')
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('POST', '/')
|
||||
fd2 = mock_socket.add_request('POST', '/async')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
|
||||
self.assertTrue(fd2.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 9\r\n', fd2.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd2.response)
|
||||
self.assertTrue(fd2.response.endswith(b'\r\n\r\nbar-async'))
|
||||
|
||||
def test_before_after_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.before_request
|
||||
def before_request(req):
|
||||
if req.path == '/bar':
|
||||
return 'bar', 202
|
||||
req.g.message = 'baz'
|
||||
|
||||
@app.after_request
|
||||
def after_request_one(req, res):
|
||||
res.headers['X-One'] = '1'
|
||||
|
||||
@app.after_request
|
||||
async def after_request_two(req, res):
|
||||
res.set_cookie('foo', 'bar')
|
||||
return res
|
||||
|
||||
@app.route('/bar')
|
||||
def bar(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/baz')
|
||||
def baz(req):
|
||||
return req.g.message
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/bar')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 202 N/A\r\n'))
|
||||
self.assertIn(b'X-One: 1\r\n', fd.response)
|
||||
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/baz')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'X-One: 1\r\n', fd.response)
|
||||
self.assertIn(b'Set-Cookie: foo=bar\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbaz'))
|
||||
|
||||
def test_404(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 404 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 9\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nNot found'))
|
||||
|
||||
def test_404_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.errorhandler(404)
|
||||
async def handle_404(req):
|
||||
return '404'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n404'))
|
||||
|
||||
def test_500(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 500 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 21\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nInternal server error'))
|
||||
|
||||
def test_500_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(500)
|
||||
def handle_500(req):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
|
||||
|
||||
def test_exception_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(ZeroDivisionError)
|
||||
async def handle_div_zero(req, exc):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 501 N/A\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n501'))
|
||||
86
tests/microdot_asyncio/test_request_asyncio.py
Normal file
86
tests/microdot_asyncio/test_request_asyncio.py
Normal file
@@ -0,0 +1,86 @@
|
||||
try:
|
||||
import uasyncio as asyncio
|
||||
except ImportError:
|
||||
import asyncio
|
||||
|
||||
import unittest
|
||||
from microdot_asyncio import Request
|
||||
from tests.mock_socket import get_async_request_fd
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
class TestRequestAsync(unittest.TestCase):
|
||||
def test_create_request(self):
|
||||
fd = get_async_request_fd('GET', '/foo')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertEqual(req.client_addr, 'addr')
|
||||
self.assertEqual(req.method, 'GET')
|
||||
self.assertEqual(req.path, '/foo')
|
||||
self.assertEqual(req.http_version, '1.0')
|
||||
self.assertIsNone(req.query_string)
|
||||
self.assertEqual(req.args, {})
|
||||
self.assertEqual(req.headers, {'Host': 'example.com:1234'})
|
||||
self.assertEqual(req.cookies, {})
|
||||
self.assertEqual(req.content_length, 0)
|
||||
self.assertEqual(req.content_type, None)
|
||||
self.assertEqual(req.body, b'')
|
||||
self.assertEqual(req.json, None)
|
||||
self.assertEqual(req.form, None)
|
||||
|
||||
def test_headers(self):
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json',
|
||||
'Cookie': 'foo=bar;abc=def',
|
||||
'Content-Length': '3'}, body='aaa')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertEqual(req.headers, {
|
||||
'Host': 'example.com:1234',
|
||||
'Content-Type': 'application/json',
|
||||
'Cookie': 'foo=bar;abc=def',
|
||||
'Content-Length': '3'})
|
||||
self.assertEqual(req.content_type, 'application/json')
|
||||
self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'})
|
||||
self.assertEqual(req.content_length, 3)
|
||||
self.assertEqual(req.body, b'aaa')
|
||||
|
||||
def test_args(self):
|
||||
fd = get_async_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
|
||||
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
|
||||
def test_json(self):
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
json = req.json
|
||||
self.assertEqual(json, {'foo': 'bar'})
|
||||
self.assertTrue(req.json is json)
|
||||
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='[1, "2"]')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertEqual(req.json, [1, '2'])
|
||||
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/xml'}, body='[1, "2"]')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertIsNone(req.json)
|
||||
|
||||
def test_form(self):
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
form = req.form
|
||||
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertTrue(req.form is form)
|
||||
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = _run(Request.create(fd, 'addr'))
|
||||
self.assertIsNone(req.form)
|
||||
86
tests/microdot_asyncio/test_response_asyncio.py
Normal file
86
tests/microdot_asyncio/test_response_asyncio.py
Normal file
@@ -0,0 +1,86 @@
|
||||
try:
|
||||
import uasyncio as asyncio
|
||||
except ImportError:
|
||||
import asyncio
|
||||
|
||||
import unittest
|
||||
from microdot_asyncio import Response
|
||||
from tests.mock_socket import FakeStreamAsync
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
class TestResponseAsync(unittest.TestCase):
|
||||
def test_create_from_string(self):
|
||||
res = Response('foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_from_string_with_content_length(self):
|
||||
res = Response('foo', headers={'Content-Length': '2'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Length': '2'})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 2\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_from_bytes(self):
|
||||
res = Response(b'foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {})
|
||||
self.assertEqual(res.body, b'foo')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nfoo'))
|
||||
|
||||
def test_create_empty(self):
|
||||
res = Response(headers={'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.body, b'')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'X-Foo: Bar\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 0\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
|
||||
|
||||
def test_create_json(self):
|
||||
res = Response({'foo': 'bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
|
||||
self.assertEqual(res.body, b'{"foo": "bar"}')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 14\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n{"foo": "bar"}'))
|
||||
|
||||
res = Response([1, '2'])
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'Content-Type': 'application/json'})
|
||||
self.assertEqual(res.body, b'[1, "2"]')
|
||||
fd = FakeStreamAsync()
|
||||
_run(res.write(fd))
|
||||
self.assertIn(b'HTTP/1.0 200 OK\r\n', fd.response)
|
||||
self.assertIn(b'Content-Length: 8\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: application/json\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n[1, "2"]'))
|
||||
42
tests/mock_asyncio.py
Normal file
42
tests/mock_asyncio.py
Normal file
@@ -0,0 +1,42 @@
|
||||
try:
|
||||
import uasyncio as asyncio
|
||||
except ImportError:
|
||||
import asyncio
|
||||
|
||||
from tests import mock_socket
|
||||
|
||||
_calls = []
|
||||
|
||||
|
||||
class EventLoop:
|
||||
def run_until_complete(self, coro):
|
||||
_calls.append(('run_until_complete', coro))
|
||||
self.coro = coro
|
||||
|
||||
def run_forever(self):
|
||||
_calls.append(('run_forever',))
|
||||
|
||||
async def rf():
|
||||
s = mock_socket.socket()
|
||||
while True:
|
||||
fd, addr = s.accept()
|
||||
fd = mock_socket.FakeStreamAsync(fd)
|
||||
await self.coro(fd, fd)
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(rf())
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
loop = EventLoop()
|
||||
|
||||
|
||||
def get_event_loop():
|
||||
_calls.append(('get_event_loop',))
|
||||
return loop
|
||||
|
||||
|
||||
def start_server(cb, host, port):
|
||||
_calls.append(('start_server', cb, host, port))
|
||||
return cb
|
||||
102
tests/mock_socket.py
Normal file
102
tests/mock_socket.py
Normal file
@@ -0,0 +1,102 @@
|
||||
try:
|
||||
import uio as io
|
||||
except ImportError:
|
||||
import io
|
||||
|
||||
SOL_SOCKET = 'SOL_SOCKET'
|
||||
SO_REUSEADDR = 'SO_REUSEADDR'
|
||||
|
||||
_calls = []
|
||||
_requests = []
|
||||
|
||||
|
||||
def getaddrinfo(host, port):
|
||||
_calls.append(('getaddrinfo', host, port))
|
||||
return (('family', 'addr'), 'socktype', 'proto', 'canonname', 'sockaddr')
|
||||
|
||||
|
||||
class socket:
|
||||
def __init__(self):
|
||||
self.request_index = 0
|
||||
|
||||
def setsockopt(self, level, optname, value):
|
||||
_calls.append(('setsockopt', level, optname, value))
|
||||
|
||||
def bind(self, addr):
|
||||
_calls.append(('bind', addr))
|
||||
|
||||
def listen(self, backlog):
|
||||
_calls.append(('listen', backlog))
|
||||
|
||||
def accept(self):
|
||||
_calls.append(('accept',))
|
||||
self.request_index += 1
|
||||
return _requests[self.request_index - 1], 'addr'
|
||||
|
||||
|
||||
class FakeStream(io.BytesIO):
|
||||
def __init__(self, input_data):
|
||||
super().__init__(input_data)
|
||||
self.response = b''
|
||||
|
||||
def write(self, data):
|
||||
self.response += data
|
||||
|
||||
|
||||
class FakeStreamAsync:
|
||||
def __init__(self, stream=None):
|
||||
if stream is None:
|
||||
stream = FakeStream(b'')
|
||||
self.stream = stream
|
||||
|
||||
@property
|
||||
def response(self):
|
||||
return self.stream.response
|
||||
|
||||
async def readline(self):
|
||||
return self.stream.readline()
|
||||
|
||||
async def read(self, n):
|
||||
return self.stream.read(n)
|
||||
|
||||
async def awrite(self, data):
|
||||
self.stream.write(data)
|
||||
|
||||
async def aclose(self):
|
||||
pass
|
||||
|
||||
def get_extra_info(self, name, default=None):
|
||||
return name
|
||||
|
||||
|
||||
def get_request_fd(method, path, headers=None, body=None):
|
||||
if headers is None:
|
||||
headers = {}
|
||||
if body is None:
|
||||
body = ''
|
||||
elif 'Content-Length' not in headers:
|
||||
headers['Content-Length'] = str(len(body))
|
||||
request_bytes = '{method} {path} HTTP/1.0\n'.format(
|
||||
method=method, path=path)
|
||||
if 'Host' not in headers:
|
||||
headers['Host'] = 'example.com:1234'
|
||||
for header, value in headers.items():
|
||||
request_bytes += '{header}: {value}\n'.format(
|
||||
header=header, value=value)
|
||||
request_bytes += '\n' + body
|
||||
return FakeStream(request_bytes.encode())
|
||||
|
||||
|
||||
def get_async_request_fd(method, path, headers=None, body=None):
|
||||
fd = get_request_fd(method, path, headers=headers, body=body)
|
||||
return FakeStreamAsync(fd)
|
||||
|
||||
|
||||
def clear_requests():
|
||||
_requests.clear()
|
||||
|
||||
|
||||
def add_request(method, path, headers=None, body=None):
|
||||
fd = get_request_fd(method, path, headers=headers, body=body)
|
||||
_requests.append(fd)
|
||||
return fd
|
||||
34
tox.ini
Normal file
34
tox.ini
Normal file
@@ -0,0 +1,34 @@
|
||||
[tox]
|
||||
envlist=flake8,py35,py36,py37,upy
|
||||
skipsdist=True
|
||||
skip_missing_interpreters=True
|
||||
|
||||
[testenv]
|
||||
commands=
|
||||
pip install -e microdot
|
||||
pip install -e microdot-asyncio
|
||||
coverage run --branch --include="microdot*.py" -m unittest tests
|
||||
coverage report --show-missing
|
||||
coverage erase
|
||||
deps=coverage
|
||||
basepython=
|
||||
flake8: python3.7
|
||||
py35: python3.5
|
||||
py36: python3.6
|
||||
py37: python3.7
|
||||
upy: python3.7
|
||||
|
||||
[testenv:flake8]
|
||||
deps=
|
||||
flake8
|
||||
commands=
|
||||
flake8 --exclude tests/libs microdot microdot-asyncio tests
|
||||
|
||||
[testenv:upy]
|
||||
whitelist_externals=sh
|
||||
commands=sh -c "bin/micropython run_tests.py"
|
||||
|
||||
[testenv:upy-mac]
|
||||
whitelist_externals=micropython
|
||||
commands=micropython run_tests.py
|
||||
deps=
|
||||
Reference in New Issue
Block a user