11 Commits

Author SHA1 Message Date
Miguel Grinberg
cb2a23285e Release 1.2.0 2022-09-25 12:19:31 +01:00
Miguel Grinberg
b133dcc343 URL encode/decode unit tests 2022-09-24 20:15:22 +01:00
Miguel Grinberg
01947b101e Cache user session 2022-09-24 19:40:28 +01:00
Miguel Grinberg
1547e861ee request.url attribute with the complete URL of the request 2022-09-24 19:33:46 +01:00
Miguel Grinberg
672512e086 urlencode() function 2022-09-24 19:33:10 +01:00
Miguel Grinberg
a8515c97b0 Small performance improvement for NoCaseDict 2022-09-24 15:37:52 +01:00
Miguel Grinberg
8ebe81c09b File upload example 2022-09-22 17:52:48 +01:00
Miguel Grinberg
4f263c63ab Minor documentation styling fixes 2022-09-21 23:38:51 +01:00
Miguel Grinberg
b0fd6c4323 Use a case insensitive dict for headers 2022-09-21 23:29:01 +01:00
Miguel Grinberg
cbefb6bf3a Do not log HTTPException occurrences 2022-09-19 23:50:04 +01:00
Miguel Grinberg
c81a2649c5 Version 1.1.2.dev0 2022-09-18 11:28:48 +01:00
17 changed files with 254 additions and 76 deletions

View File

@@ -1,5 +1,15 @@
# Microdot change log
**Release 1.2.0** - 2022-09-25
- Use a case insensitive dict for headers ([commit #1](https://github.com/miguelgrinberg/microdot/commit/b0fd6c432371ca5cb10d07ff84c4deed7aa0ce2e) [commit #2](https://github.com/miguelgrinberg/microdot/commit/a8515c97b030f942fa6ca85cbe1772291468fb0d))
- urlencode() helper function ([commit #1](https://github.com/miguelgrinberg/microdot/commit/672512e086384e808489305502e6ebebcc5a888f) [commit #2](https://github.com/miguelgrinberg/microdot/commit/b133dcc34368853ee685396a1bcb50360e807813))
- Added `request.url` attribute with the complete URL of the request ([commit](https://github.com/miguelgrinberg/microdot/commit/1547e861ee28d43d10fe4c4ed1871345d4b81086))
- Do not log HTTPException occurrences ([commit](https://github.com/miguelgrinberg/microdot/commit/cbefb6bf3a3fdcff8b7a8bacad3449be18e46e3b))
- Cache user session for performance ([commit](https://github.com/miguelgrinberg/microdot/commit/01947b101ebe198312c88d73872e3248024918f0))
- File upload example ([commit](https://github.com/miguelgrinberg/microdot/commit/8ebe81c09b604ddc1123e78ad6bc87ceda5f8597))
- Minor documentation styling fixes ([commit](https://github.com/miguelgrinberg/microdot/commit/4f263c63ab7bb1ce0dd48d8e00f3c6891e1bf07e))
**Release 1.1.1** - 2022-09-18
- Make WebSocket internals consistent between TLS and non-TLS [#61](https://github.com/miguelgrinberg/microdot/issues/61) ([commit](https://github.com/miguelgrinberg/microdot/commit/5693b812ceb2c0d51ec3c991adf6894a87e6fcc7))

View File

@@ -1,3 +1,3 @@
.py .class, .py .method, .py .property {
.py.class, .py.function, .py.method, .py.property {
margin-top: 20px;
}

View File

@@ -13,6 +13,9 @@ API Reference
.. autoclass:: microdot.Response
:members:
.. autoclass:: microdot.NoCaseDict
:members:
.. autoclass:: microdot.MultiDict
:members:

View File

@@ -0,0 +1 @@
This directory contains file upload examples.

View File

@@ -0,0 +1 @@
Uploaded files are saved to this directory.

View File

@@ -0,0 +1,34 @@
<!doctype html>
<html>
<head>
<title>Microdot Upload Example</title>
</head>
<body>
<h1>Microdot Upload Example</h1>
<form id="form">
<input type="file" id="file" name="file" />
<input type="submit" value="Upload" />
</form>
<script>
async function upload(ev) {
ev.preventDefault();
const file = document.getElementById('file').files[0];
if (!file) {
return;
}
await fetch('/upload', {
method: 'POST',
body: file,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename="${file.name}"`,
},
}).then(res => {
console.log('Upload accepted');
window.location.href = '/';
});
}
document.getElementById('form').addEventListener('submit', upload);
</script>
</body>
</html>

View File

@@ -0,0 +1,33 @@
from microdot import Microdot, send_file
app = Microdot()
@app.get('/')
def index(request):
return send_file('index.html')
@app.post('/upload')
def upload(request):
# obtain the filename and size from request headers
filename = request.headers['Content-Disposition'].split(
'filename=')[1].strip('"')
size = int(request.headers['Content-Length'])
# sanitize the filename
filename = filename.replace('/', '_')
# write the file to the files directory in 1K chunks
with open('files/' + filename, 'wb') as f:
while size > 0:
chunk = request.stream.read(min(size, 1024))
f.write(chunk)
size -= len(chunk)
print('Successfully saved file: ' + filename)
return ''
if __name__ == '__main__':
app.run()

View File

@@ -1,6 +1,6 @@
[metadata]
name = microdot
version = 1.1.1
version = 1.2.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython

View File

@@ -91,6 +91,60 @@ def urldecode_bytes(s):
return b''.join(result).decode()
def urlencode(s):
return s.replace('+', '%2B').replace(' ', '+').replace(
'%', '%25').replace('?', '%3F').replace('#', '%23').replace(
'&', '%26').replace('=', '%3D')
class NoCaseDict(dict):
"""A subclass of dictionary that holds case-insensitive keys.
:param initial_dict: an initial dictionary of key/value pairs to
initialize this object with.
Example::
>>> d = NoCaseDict()
>>> d['Content-Type'] = 'text/html'
>>> print(d['Content-Type'])
text/html
>>> print(d['content-type'])
text/html
>>> print(d['CONTENT-TYPE'])
text/html
>>> del d['cOnTeNt-TyPe']
>>> print(d)
{}
"""
def __init__(self, initial_dict=None):
super().__init__(initial_dict or {})
self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k}
def __setitem__(self, key, value):
kl = key.lower()
key = self.keymap.get(kl, key)
if kl != key:
self.keymap[kl] = key
super().__setitem__(key, value)
def __getitem__(self, key):
kl = key.lower()
return super().__getitem__(self.keymap.get(kl, kl))
def __delitem__(self, key):
kl = key.lower()
super().__delitem__(self.keymap.get(kl, kl))
def __contains__(self, key):
kl = key.lower()
return self.keymap.get(kl, kl) in self.keys()
def get(self, key, default=None):
kl = key.lower()
return super().get(self.keymap.get(kl, kl), default)
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
@@ -224,6 +278,8 @@ class Request():
self.client_addr = client_addr
#: The HTTP method of the request.
self.method = method
#: The request URL, including the path and query string.
self.url = url
#: The path portion of the URL.
self.path = url
#: The query string portion of the URL.
@@ -248,16 +304,14 @@ class Request():
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
for header, value in self.headers.items():
header = header.lower()
if header == 'content-length':
self.content_length = int(value)
elif header == 'content-type':
self.content_type = value
elif header == 'cookie':
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
if 'Content-Length' in self.headers:
self.content_length = int(self.headers['Content-Length'])
if 'Content-Type' in self.headers:
self.content_type = self.headers['Content-Type']
if 'Cookie' in self.headers:
for cookie in self.headers['Cookie'].split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
self._body = body
self.body_used = False
@@ -289,7 +343,7 @@ class Request():
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
headers = NoCaseDict()
while True:
line = Request._safe_readline(client_stream).strip().decode()
if line == '':
@@ -437,7 +491,7 @@ class Response():
body = ''
status_code = 204
self.status_code = status_code
self.headers = headers.copy() if headers else {}
self.headers = NoCaseDict(headers or {})
self.reason = reason
if isinstance(body, (dict, list)):
self.body = json.dumps(body).encode()
@@ -1044,7 +1098,6 @@ class Microdot():
else:
res = 'Not found', f
except HTTPException as exc:
print_exception(exc)
if exc.status_code in self.error_handlers:
res = self.error_handlers[exc.status_code](req)
else:

View File

@@ -4,6 +4,7 @@ import signal
from microdot_asyncio import * # noqa: F401, F403
from microdot_asyncio import Microdot as BaseMicrodot
from microdot_asyncio import Request
from microdot import NoCaseDict
class _BodyStream: # pragma: no cover
@@ -55,7 +56,7 @@ class Microdot(BaseMicrodot):
path = scope['path']
if 'query_string' in scope and scope['query_string']:
path += '?' + scope['query_string'].decode()
headers = {}
headers = NoCaseDict()
content_length = 0
for key, value in scope.get('headers', []):
headers[key] = value

View File

@@ -17,9 +17,10 @@ except ImportError:
import io
from microdot import Microdot as BaseMicrodot
from microdot import print_exception
from microdot import NoCaseDict
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
from microdot import print_exception
from microdot import HTTPException
from microdot import MUTED_SOCKET_ERRORS
@@ -74,7 +75,7 @@ class Request(BaseRequest):
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
headers = NoCaseDict()
content_length = 0
while True:
line = (await Request._safe_readline(
@@ -386,7 +387,6 @@ class Microdot(BaseMicrodot):
else:
res = 'Not found', f
except HTTPException as exc:
print_exception(exc)
if exc.status_code in self.error_handlers:
res = self.error_handlers[exc.status_code](req)
else:

View File

@@ -23,15 +23,19 @@ def get_session(request):
global secret_key
if not secret_key:
raise ValueError('The session secret key is not configured')
if hasattr(request.g, '_session'):
return request.g._session
session = request.cookies.get('session')
if session is None:
return {}
request.g._session = {}
return request.g._session
try:
session = jwt.decode(session, secret_key, algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
raise
return {}
return session
request.g._session = {}
else:
request.g._session = session
return request.g._session
def update_session(request, session):

View File

@@ -1,6 +1,6 @@
from io import BytesIO
import json
from microdot import Request, Response
from microdot import Request, Response, NoCaseDict
try:
from microdot_websocket import WebSocket
except: # pragma: no cover # noqa: E722
@@ -46,11 +46,10 @@ class TestResponse:
pass
def _process_json_body(self):
for name, value in self.headers.items(): # pragma: no branch
if name.lower() == 'content-type':
if value.lower().split(';')[0] == 'application/json':
self.json = json.loads(self.text)
break
if 'Content-Type' in self.headers: # pragma: no branch
content_type = self.headers['Content-Type']
if content_type.split(';')[0] == 'application/json':
self.json = json.loads(self.text)
@classmethod
def create(cls, res):
@@ -97,13 +96,11 @@ class TestClient:
body = b''
elif isinstance(body, (dict, list)):
body = json.dumps(body).encode()
if 'Content-Type' not in headers and \
'content-type' not in headers: # pragma: no cover
if 'Content-Type' not in headers: # pragma: no cover
headers['Content-Type'] = 'application/json'
elif isinstance(body, str):
body = body.encode()
if body and 'Content-Length' not in headers and \
'content-length' not in headers:
if body and 'Content-Length' not in headers:
headers['Content-Length'] = str(len(body))
if 'Host' not in headers: # pragma: no branch
headers['Host'] = 'example.com:1234'
@@ -132,29 +129,28 @@ class TestClient:
return request_bytes
def _update_cookies(self, res):
for name, value in res.headers.items():
if name.lower() == 'set-cookie':
for cookie in value:
cookie_name, cookie_value = cookie.split('=', 1)
cookie_options = cookie_value.split(';')
delete = False
for option in cookie_options[1:]:
if option.strip().lower().startswith('expires='):
_, e = option.strip().split('=', 1)
# this is a very limited parser for cookie expiry
# that only detects a cookie deletion request when
# the date is 1/1/1970
if '1 jan 1970' in e.lower(): # pragma: no branch
delete = True
break
if delete:
if cookie_name in self.cookies: # pragma: no branch
del self.cookies[cookie_name]
else:
self.cookies[cookie_name] = cookie_options[0]
cookies = res.headers.get('Set-Cookie', [])
for cookie in cookies:
cookie_name, cookie_value = cookie.split('=', 1)
cookie_options = cookie_value.split(';')
delete = False
for option in cookie_options[1:]:
if option.strip().lower().startswith('expires='):
_, e = option.strip().split('=', 1)
# this is a very limited parser for cookie expiry
# that only detects a cookie deletion request when
# the date is 1/1/1970
if '1 jan 1970' in e.lower(): # pragma: no branch
delete = True
break
if delete:
if cookie_name in self.cookies: # pragma: no branch
del self.cookies[cookie_name]
else:
self.cookies[cookie_name] = cookie_options[0]
def request(self, method, path, headers=None, body=None, sock=None):
headers = headers or {}
headers = NoCaseDict(headers or {})
body, headers = self._process_body(body, headers)
cookies, headers = self._process_cookies(headers)
request_bytes = self._render_request(method, path, headers, body)

View File

@@ -1,8 +1,7 @@
import os
import signal
from microdot import * # noqa: F401, F403
from microdot import Microdot as BaseMicrodot
from microdot import Request
from microdot import Microdot as BaseMicrodot, Request, NoCaseDict
class Microdot(BaseMicrodot):
@@ -15,7 +14,7 @@ class Microdot(BaseMicrodot):
path = environ.get('SCRIPT_NAME', '') + environ.get('PATH_INFO', '')
if 'QUERY_STRING' in environ and environ['QUERY_STRING']:
path += '?' + environ['QUERY_STRING']
headers = {}
headers = NoCaseDict()
for k, v in environ.items():
if k.startswith('HTTP_'):
h = '-'.join([p.title() for p in k[5:].split('_')])

View File

@@ -1,31 +1,60 @@
import unittest
from microdot import MultiDict
from microdot import MultiDict, NoCaseDict
class TestMultiDict(unittest.TestCase):
def test_multidict(self):
d = MultiDict()
assert dict(d) == {}
assert d.get('zero') is None
assert d.get('zero', default=0) == 0
assert d.getlist('zero') == []
assert d.getlist('zero', type=int) == []
self.assertEqual(dict(d), {})
self.assertIsNone(d.get('zero'))
self.assertEqual(d.get('zero', default=0), 0)
self.assertEqual(d.getlist('zero'), [])
self.assertEqual(d.getlist('zero', type=int), [])
d['one'] = 1
assert d['one'] == 1
assert d.get('one') == 1
assert d.get('one', default=2) == 1
assert d.get('one', type=int) == 1
assert d.get('one', type=str) == '1'
self.assertEqual(d['one'], 1)
self.assertEqual(d.get('one'), 1)
self.assertEqual(d.get('one', default=2), 1)
self.assertEqual(d.get('one', type=int), 1)
self.assertEqual(d.get('one', type=str), '1')
d['two'] = 1
d['two'] = 2
assert d['two'] == 1
assert d.get('two') == 1
assert d.get('two', default=2) == 1
assert d.get('two', type=int) == 1
assert d.get('two', type=str) == '1'
assert d.getlist('two') == [1, 2]
assert d.getlist('two', type=int) == [1, 2]
assert d.getlist('two', type=str) == ['1', '2']
self.assertEqual(d['two'], 1)
self.assertEqual(d.get('two'), 1)
self.assertEqual(d.get('two', default=2), 1)
self.assertEqual(d.get('two', type=int), 1)
self.assertEqual(d.get('two', type=str), '1')
self.assertEqual(d.getlist('two'), [1, 2])
self.assertEqual(d.getlist('two', type=int), [1, 2])
self.assertEqual(d.getlist('two', type=str), ['1', '2'])
def test_case_insensitive_dict(self):
d = NoCaseDict()
d['One'] = 1
d['one'] = 2
d['ONE'] = 3
d['One'] = 4
d['two'] = 5
self.assertEqual(d['one'], 4)
self.assertEqual(d['One'], 4)
self.assertEqual(d['ONE'], 4)
self.assertEqual(d['onE'], 4)
self.assertEqual(d['two'], 5)
self.assertEqual(d['tWO'], 5)
self.assertEqual(d.get('one'), 4)
self.assertEqual(d.get('One'), 4)
self.assertEqual(d.get('ONE'), 4)
self.assertEqual(d.get('onE'), 4)
self.assertEqual(d.get('two'), 5)
self.assertEqual(d.get('tWO'), 5)
self.assertIn(('One', 4), list(d.items()))
self.assertIn(('two', 5), list(d.items()))
self.assertIn(4, list(d.values()))
self.assertIn(5, list(d.values()))
del d['oNE']
self.assertEqual(list(d.items()), [('two', 5)])
self.assertEqual(list(d.values()), [5])

View File

@@ -19,6 +19,9 @@ class TestSession(unittest.TestCase):
@self.app.get('/')
def index(req):
session = get_session(req)
session2 = get_session(req)
session2['foo'] = 'bar'
self.assertEqual(session['foo'], 'bar')
return str(session.get('name'))
@self.app.get('/with')

11
tests/test_urlencode.py Normal file
View File

@@ -0,0 +1,11 @@
import unittest
from microdot import urlencode, urldecode_str, urldecode_bytes
class TestURLEncode(unittest.TestCase):
def test_urlencode(self):
self.assertEqual(urlencode('?foo=bar&x'), '%3Ffoo%3Dbar%26x')
def test_urldecode(self):
self.assertEqual(urldecode_str('%3Ffoo%3Dbar%26x'), '?foo=bar&x')
self.assertEqual(urldecode_bytes(b'%3Ffoo%3Dbar%26x'), '?foo=bar&x')