12 Commits

Author SHA1 Message Date
Miguel Grinberg
79e11262d1 Release 1.3.0 2023-04-08 17:21:30 +01:00
Miguel Grinberg
a1b061656f Tolerate slightly invalid formats in query strings (Fixes #126) 2023-04-08 17:15:54 +01:00
Miguel Grinberg
67798f7dbf Cross-Origin Resource Sharing (CORS) support (Fixes #45) 2023-03-23 00:03:21 +00:00
Miguel Grinberg
ea6766cea9 Add update() method to NoCaseDict class 2023-03-22 20:22:29 +00:00
Miguel Grinberg
6a31f89673 Respond to HEAD and OPTIONS requests 2023-03-22 12:25:21 +00:00
Miguel Grinberg
eaf2ef62d1 Documentation typo #nolog 2023-03-21 00:32:22 +00:00
Miguel Grinberg
a350e8fd1e Set exit code to 1 for failed MicroPython test runs 2023-03-21 00:28:23 +00:00
Miguel Grinberg
daf1001ec5 Support compressed files in send_file() (Fixes #93) 2023-03-21 00:24:57 +00:00
Miguel Grinberg
e684ee32d9 Add max_age argument to send_file() 2023-03-20 12:11:01 +00:00
Miguel Grinberg
573e303a98 Issue templates #nolog 2023-03-03 14:49:10 +00:00
Miguel Grinberg
3592f53999 Update gitignore #nolog 2023-03-03 11:06:50 +00:00
Miguel Grinberg
ea3722ca5c Version 1.2.5.dev0 2023-03-03 08:46:27 +00:00
26 changed files with 651 additions and 29 deletions

26
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,26 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**IMPORTANT**: If you have a question, or you are not sure if you have found a bug in this package, then you are in the wrong place. Hit back in your web browser, and then open a GitHub Discussion instead. Likewise, if you are unable to provide the information requested below, open a discussion to troubleshoot your issue.
**Describe the bug**
A clear and concise description of what the bug is. If you are getting errors, please include the complete error message, including the stack trace.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Additional context**
Add any other context about the problem here.

5
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1,5 @@
blank_issues_enabled: false
contact_links:
- name: GitHub Discussions
url: https://github.com/miguelgrinberg/microdot/discussions
about: Ask questions here.

View File

@@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

5
.gitignore vendored
View File

@@ -103,3 +103,8 @@ venv.bak/
# mypy
.mypy_cache/
# other
*.der
*.pem
*_txt.py

View File

@@ -1,5 +1,15 @@
# Microdot change log
**Release 1.3.0** - 2023-04-08
- Cross-Origin Resource Sharing (CORS) extension [#45](https://github.com/miguelgrinberg/microdot/issues/45) ([commit](https://github.com/miguelgrinberg/microdot/commit/67798f7dbffb30018ab4b62a9aaa297f63bc9e64))
- Respond to `HEAD` and `OPTIONS` requests ([commit](https://github.com/miguelgrinberg/microdot/commit/6a31f89673518e79fef5659c04e609b7976a5e34))
- Tolerate slightly invalid formats in query strings [#126](https://github.com/miguelgrinberg/microdot/issues/126) ([commit](https://github.com/miguelgrinberg/microdot/commit/a1b061656fa19dae583951596b0f1f0603652a56))
- Support compressed files in `send_file()` [#93](https://github.com/miguelgrinberg/microdot/issues/93) ([commit](https://github.com/miguelgrinberg/microdot/commit/daf1001ec55ab38e6cdfee4931729a3b7506858b))
- Add `max_age` argument to `send_file()` ([commit](https://github.com/miguelgrinberg/microdot/commit/e684ee32d91d3e2ab9569bb5fd342986c010ffeb))
- Add `update()` method to `NoCaseDict` class ([commit](https://github.com/miguelgrinberg/microdot/commit/ea6766cea96b756b36ed777f9c1b6a6680db09ba))
- Set exit code to 1 for failed MicroPython test runs ([commit](https://github.com/miguelgrinberg/microdot/commit/a350e8fd1e55fac12c9e5b909cfa82d880b177ef))
**Release 1.2.4** - 2023-03-03
- One more attempt to correct build issues ([commit](https://github.com/miguelgrinberg/microdot/commit/cb39898829f4edc233ab4e7ba3f7ef3c5c50f196))

View File

@@ -52,6 +52,12 @@ API Reference
.. automodule:: microdot_session
:members:
``microdot_cors`` module
------------------------
.. automodule:: microdot_cors
:members:
``microdot_websocket`` module
------------------------------

View File

@@ -208,6 +208,42 @@ Example::
delete_session(req)
return redirect('/')
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `microdot.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot.py>`_
| `microdot_cors.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot_cors.py>`_
* - Required external dependencies
- | None
* - Examples
- | `cors.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/cors/cors.py>`_
The CORS extension provides support for `Cross-Origin Resource Sharing
(CORS) <https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS>`_. CORS is a
mechanism that allows web applications running on different origins to access
resources from each other. For example, a web application running on
``https://example.com`` can access resources from ``https://api.example.com``.
To enable CORS support, create an instance of the
:class:`CORS <microdot_cors.CORS>` class and configure the desired options.
Example::
from microdot import Microdot
from microdot_cors import CORS
app = Microdot()
cors = CORS(app, allowed_origins=['https://example.com'],
allow_credentials=True)
WebSocket Support
~~~~~~~~~~~~~~~~~

View File

@@ -662,6 +662,15 @@ object for a file::
def index(request):
return send_file('/static/index.html')
A suggested caching duration can be returned to the client in the ``max_age``
argument::
from microdot import send_file
@app.get('/')
def image(request):
return send_file('/static/image.jpg', max_age=3600) # in seconds
.. note::
Unlike other web frameworks, Microdot does not automatically configure a
route to serve static files. The following is an example route that can be
@@ -673,7 +682,7 @@ object for a file::
if '..' in path:
# directory traversal is not allowed
return 'Not found', 404
return send_file('static/' + path)
return send_file('static/' + path, max_age=86400)
Streaming Responses
^^^^^^^^^^^^^^^^^^^

1
examples/cors/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains Cross-Origin Resource Sharing (CORS) examples.

14
examples/cors/app.py Normal file
View File

@@ -0,0 +1,14 @@
from microdot import Microdot
from microdot_cors import CORS
app = Microdot()
CORS(app, allowed_origins=['https://example.org'], allow_credentials=True)
@app.route('/')
def index(request):
return 'Hello World!'
if __name__ == '__main__':
app.run()

View File

@@ -0,0 +1,20 @@
from microdot import Microdot, send_file
app = Microdot()
@app.route('/')
def index(request):
return send_file('gzstatic/index.html', compressed=True,
file_extension='.gz')
@app.route('/static/<path:path>')
def static(request, path):
if '..' in path:
# directory traversal is not allowed
return 'Not found', 404
return send_file('gzstatic/' + path, compressed=True, file_extension='.gz')
app.run(debug=True)

Binary file not shown.

Binary file not shown.

View File

@@ -6,4 +6,5 @@ sys.path.insert(3, 'libs/micropython')
import unittest
unittest.main('tests')
if not unittest.main('tests').wasSuccessful():
sys.exit(1)

View File

@@ -1,6 +1,6 @@
[metadata]
name = microdot
version = 1.2.4
version = 1.3.0
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython

View File

@@ -146,6 +146,10 @@ class NoCaseDict(dict):
kl = key.lower()
return super().get(self.keymap.get(kl, kl), default)
def update(self, other_dict):
for key, value in other_dict.items():
self[key] = value
def mro(cls): # pragma: no cover
"""Return the method resolution order of a class.
@@ -400,11 +404,11 @@ class Request():
if len(urlencoded) > 0:
if isinstance(urlencoded, str):
for k, v in [pair.split('=', 1)
for pair in urlencoded.split('&')]:
for pair in urlencoded.split('&') if pair]:
data[urldecode_str(k)] = urldecode_str(v)
elif isinstance(urlencoded, bytes): # pragma: no branch
for k, v in [pair.split(b'=', 1)
for pair in urlencoded.split(b'&')]:
for pair in urlencoded.split(b'&') if pair]:
data[urldecode_bytes(k)] = urldecode_bytes(v)
return data
@@ -525,6 +529,10 @@ class Response():
#: ``Content-Type`` header.
default_content_type = 'text/plain'
#: The default cache control max age used by :meth:`send_file`. A value
#: of ``None`` means that no ``Cache-Control`` header is added.
default_send_file_max_age = None
#: Special response used to signal that a response does not need to be
#: written to the client. Used to exit WebSocket connections cleanly.
already_handled = None
@@ -544,6 +552,7 @@ class Response():
else:
# this applies to bytes, file-like objects or generators
self.body = body
self.is_head = False
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
max_age=None, secure=False, http_only=False):
@@ -608,6 +617,7 @@ class Response():
stream.write(b'\r\n')
# body
if not self.is_head:
can_flush = hasattr(stream, 'flush')
try:
for body in self.body_iter():
@@ -651,7 +661,9 @@ class Response():
return cls(status_code=status_code, headers={'Location': location})
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
def send_file(cls, filename, status_code=200, content_type=None,
stream=None, max_age=None, compressed=False,
file_extension=''):
"""Send file contents in a response.
:param filename: The filename of the file.
@@ -659,7 +671,25 @@ class Response():
default is 302.
:param content_type: The ``Content-Type`` header to use in the
response. If omitted, it is generated
automatically from the file extension.
automatically from the file extension of the
``filename`` parameter.
:param stream: A file-like object to read the file contents from. If
a stream is given, the ``filename`` parameter is only
used when generating the ``Content-Type`` header.
:param max_age: The ``Cache-Control`` header's ``max-age`` value in
seconds. If omitted, the value of the
:attr:`Response.default_send_file_max_age` attribute is
used.
:param compressed: Whether the file is compressed. If ``True``, the
``Content-Encoding`` header is set to ``gzip``. A
string with the header value can also be passed.
Note that when using this option the file must have
been compressed beforehand. This option only sets
the header.
:param file_extension: A file extension to append to the ``filename``
parameter when opening the file, including the
dot. The extension given here is not considered
when generating the ``Content-Type`` header.
Security note: The filename is assumed to be trusted. Never pass
filenames provided by the user without validating and sanitizing them
@@ -671,9 +701,19 @@ class Response():
content_type = Response.types_map[ext]
else:
content_type = 'application/octet-stream'
f = open(filename, 'rb')
return cls(body=f, status_code=status_code,
headers={'Content-Type': content_type})
headers = {'Content-Type': content_type}
if max_age is None:
max_age = cls.default_send_file_max_age
if max_age is not None:
headers['Cache-Control'] = 'max-age={}'.format(max_age)
if compressed:
headers['Content-Encoding'] = compressed \
if isinstance(compressed, str) else 'gzip'
f = stream or open(filename + file_extension, 'rb')
return cls(body=f, status_code=status_code, headers=headers)
class URLPattern():
@@ -759,6 +799,7 @@ class Microdot():
self.after_error_request_handlers = []
self.error_handlers = {}
self.shutdown_requested = False
self.options_handler = self.default_options_handler
self.debug = False
self.server = None
@@ -794,7 +835,8 @@ class Microdot():
"""
def decorated(f):
self.url_map.append(
(methods or ['GET'], URLPattern(url_pattern), f))
([m.upper() for m in (methods or ['GET'])],
URLPattern(url_pattern), f))
return f
return decorated
@@ -1080,17 +1122,32 @@ class Microdot():
self.shutdown_requested = True
def find_route(self, req):
method = req.method.upper()
if method == 'OPTIONS' and self.options_handler:
return self.options_handler(req)
if method == 'HEAD':
method = 'GET'
f = 404
for route_methods, route_pattern, route_handler in self.url_map:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
if req.method in route_methods:
if method in route_methods:
f = route_handler
break
else:
f = 405
return f
def default_options_handler(self, req):
allow = []
for route_methods, route_pattern, route_handler in self.url_map:
if route_pattern.match(req.path) is not None:
allow.extend(route_methods)
if 'GET' in allow:
allow.append('HEAD')
allow.append('OPTIONS')
return {'Allow': ', '.join(allow)}
def handle_request(self, sock, addr):
if Request.socket_read_timeout and \
hasattr(sock, 'settimeout'): # pragma: no cover
@@ -1165,6 +1222,8 @@ class Microdot():
for handler in req.after_request_handlers:
res = handler(req, res) or res
after_request_handled = True
elif isinstance(f, dict):
res = Response(headers=f)
elif f in self.error_handlers:
res = self.error_handlers[f](req)
else:
@@ -1208,6 +1267,7 @@ class Microdot():
if not after_request_handled:
for handler in self.after_error_request_handlers:
res = handler(req, res) or res
res.is_head = (req and req.method == 'HEAD')
return res

View File

@@ -151,6 +151,7 @@ class Response(BaseResponse):
await stream.awrite(b'\r\n')
# body
if not self.is_head:
async for body in self.body_iter():
if isinstance(body, str): # pragma: no cover
body = body.encode()
@@ -385,6 +386,8 @@ class Microdot(BaseMicrodot):
res = await self._invoke_handler(
handler, req, res) or res
after_request_handled = True
elif isinstance(f, dict):
res = Response(headers=f)
elif f in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[f], req)
@@ -431,6 +434,7 @@ class Microdot(BaseMicrodot):
for handler in self.after_error_request_handlers:
res = await self._invoke_handler(
handler, req, res) or res
res.is_head = (req and req.method == 'HEAD')
return res
async def _invoke_handler(self, f_or_coro, *args, **kwargs):

110
src/microdot_cors.py Normal file
View File

@@ -0,0 +1,110 @@
class CORS:
"""Add CORS headers to HTTP responses.
:param app: The application to add CORS headers to.
:param allowed_origins: A list of origins that are allowed to make
cross-site requests. If set to '*', all origins are
allowed.
:param allow_credentials: If set to True, the
``Access-Control-Allow-Credentials`` header will
be set to ``true`` to indicate to the browser
that it can expose cookies and authentication
headers.
:param allowed_methods: A list of methods that are allowed to be used when
making cross-site requests. If not set, all methods
are allowed.
:param expose_headers: A list of headers that the browser is allowed to
exposed.
:param allowed_headers: A list of headers that are allowed to be used when
making cross-site requests. If not set, all headers
are allowed.
:param max_age: The maximum amount of time in seconds that the browser
should cache the results of a preflight request.
:param handle_cors: If set to False, CORS headers will not be added to
responses. This can be useful if you want to add CORS
headers manually.
"""
def __init__(self, app=None, allowed_origins=None, allow_credentials=False,
allowed_methods=None, expose_headers=None,
allowed_headers=None, max_age=None, handle_cors=True):
self.allowed_origins = allowed_origins
self.allow_credentials = allow_credentials
self.allowed_methods = allowed_methods
self.expose_headers = expose_headers
self.allowed_headers = None if allowed_headers is None \
else [h.lower() for h in allowed_headers]
self.max_age = max_age
if app is not None:
self.initialize(app, handle_cors=handle_cors)
def initialize(self, app, handle_cors=True):
"""Initialize the CORS object for the given application.
:param app: The application to add CORS headers to.
:param handle_cors: If set to False, CORS headers will not be added to
responses. This can be useful if you want to add
CORS headers manually.
"""
self.default_options_handler = app.options_handler
if handle_cors:
app.options_handler = self.options_handler
app.after_request(self.after_request)
app.after_error_request(self.after_request)
def options_handler(self, request):
headers = self.default_options_handler(request)
headers.update(self.get_cors_headers(request))
return headers
def get_cors_headers(self, request):
"""Return a dictionary of CORS headers to add to a given request.
:param request: The request to add CORS headers to.
"""
cors_headers = {}
origin = request.headers.get('Origin')
if self.allowed_origins == '*':
cors_headers['Access-Control-Allow-Origin'] = origin or '*'
if origin:
cors_headers['Vary'] = 'Origin'
elif origin in (self.allowed_origins or []):
cors_headers['Access-Control-Allow-Origin'] = origin
cors_headers['Vary'] = 'Origin'
if self.allow_credentials and \
'Access-Control-Allow-Origin' in cors_headers:
cors_headers['Access-Control-Allow-Credentials'] = 'true'
if self.expose_headers:
cors_headers['Access-Control-Expose-Headers'] = \
', '.join(self.expose_headers)
if request.method == 'OPTIONS':
# handle preflight request
if self.max_age:
cors_headers['Access-Control-Max-Age'] = str(self.max_age)
method = request.headers.get('Access-Control-Request-Method')
if method:
method = method.upper()
if self.allowed_methods is None or \
method in self.allowed_methods:
cors_headers['Access-Control-Allow-Methods'] = method
headers = request.headers.get('Access-Control-Request-Headers')
if headers:
if self.allowed_headers is None:
cors_headers['Access-Control-Allow-Headers'] = headers
else:
headers = [h.strip() for h in headers.split(',')]
headers = [h for h in headers
if h.lower() in self.allowed_headers]
cors_headers['Access-Control-Allow-Headers'] = \
', '.join(headers)
return cors_headers
def after_request(self, request, response):
saved_vary = response.headers.get('Vary')
response.headers.update(self.get_cors_headers(request))
if saved_vary and saved_vary != response.headers.get('Vary'):
response.headers['Vary'] = (
saved_vary + ', ' + response.headers['Vary'])

View File

@@ -58,6 +58,7 @@ class TestResponse:
test_res._initialize_body(res)
test_res._process_text_body()
test_res._process_json_body()
test_res.is_head = res.is_head
return test_res

BIN
tests/files/test.gz Normal file

Binary file not shown.

158
tests/test_cors.py Normal file
View File

@@ -0,0 +1,158 @@
import unittest
from microdot import Microdot
from microdot_test_client import TestClient
from microdot_cors import CORS
class TestCORS(unittest.TestCase):
def test_origin(self):
app = Microdot()
cors = CORS(allowed_origins=['https://example.com'],
allow_credentials=True)
cors.initialize(app)
@app.get('/')
def index(req):
return 'foo'
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
self.assertFalse('Vary' in res.headers)
res = client.get('/', headers={'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertEqual(res.headers['Access-Control-Allow-Credentials'],
'true')
self.assertEqual(res.headers['Vary'], 'Origin')
cors.allow_credentials = False
res = client.get('/foo', headers={'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
self.assertEqual(res.headers['Vary'], 'Origin')
res = client.get('/', headers={'Origin': 'https://bad.com'})
self.assertEqual(res.status_code, 200)
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
self.assertFalse('Vary' in res.headers)
def test_all_origins(self):
app = Microdot()
CORS(app, allowed_origins='*', expose_headers=['X-Test', 'X-Test2'])
@app.get('/')
def index(req):
return 'foo'
@app.get('/foo')
def foo(req):
return 'foo', {'Vary': 'X-Foo, X-Bar'}
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'], '*')
self.assertFalse('Vary' in res.headers)
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
'X-Test, X-Test2')
res = client.get('/', headers={'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertEqual(res.headers['Vary'], 'Origin')
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
'X-Test, X-Test2')
res = client.get('/bad', headers={'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 404)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertEqual(res.headers['Vary'], 'Origin')
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
'X-Test, X-Test2')
res = client.get('/foo', headers={'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Vary'], 'X-Foo, X-Bar, Origin')
def test_cors_preflight(self):
app = Microdot()
CORS(app, allowed_origins='*')
@app.route('/', methods=['GET', 'POST'])
def index(req):
return 'foo'
client = TestClient(app)
res = client.request('OPTIONS', '/', headers={
'Origin': 'https://example.com',
'Access-Control-Request-Method': 'POST',
'Access-Control-Request-Headers': 'X-Test, X-Test2'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertFalse('Access-Control-Max-Age' in res.headers)
self.assertEqual(res.headers['Access-Control-Allow-Methods'], 'POST')
self.assertEqual(res.headers['Access-Control-Allow-Headers'],
'X-Test, X-Test2')
res = client.request('OPTIONS', '/', headers={
'Origin': 'https://example.com'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertFalse('Access-Control-Max-Age' in res.headers)
self.assertFalse('Access-Control-Allow-Methods' in res.headers)
self.assertFalse('Access-Control-Allow-Headers' in res.headers)
def test_cors_preflight_with_options(self):
app = Microdot()
CORS(app, allowed_origins='*', max_age=3600, allowed_methods=['POST'],
allowed_headers=['X-Test'])
@app.route('/', methods=['GET', 'POST'])
def index(req):
return 'foo'
client = TestClient(app)
res = client.request('OPTIONS', '/', headers={
'Origin': 'https://example.com',
'Access-Control-Request-Method': 'POST',
'Access-Control-Request-Headers': 'X-Test, X-Test2'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
'https://example.com')
self.assertEqual(res.headers['Access-Control-Max-Age'], '3600')
self.assertEqual(res.headers['Access-Control-Allow-Methods'], 'POST')
self.assertEqual(res.headers['Access-Control-Allow-Headers'], 'X-Test')
res = client.request('OPTIONS', '/', headers={
'Origin': 'https://example.com',
'Access-Control-Request-Method': 'GET'})
self.assertEqual(res.status_code, 200)
self.assertFalse('Access-Control-Allow-Methods' in res.headers)
self.assertFalse('Access-Control-Allow-Headers' in res.headers)
def test_cors_disabled(self):
app = Microdot()
CORS(app, allowed_origins='*', handle_cors=False)
@app.get('/')
def index(req):
return 'foo'
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
self.assertFalse('Vary' in res.headers)

View File

@@ -63,6 +63,52 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.headers['Content-Length'], '3')
self.assertEqual(res.text, 'bar')
def test_head_request(self):
self._mock()
app = Microdot()
@app.route('/foo')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('HEAD', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain; charset=UTF-8\r\n',
fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
self._unmock()
def test_options_request(self):
app = Microdot()
@app.route('/', methods=['GET', 'DELETE'])
def index(req):
return 'foo'
@app.post('/')
def index_post(req):
return 'bar'
@app.route('/foo', methods=['POST', 'PUT'])
def foo(req):
return 'baz'
client = TestClient(app)
res = client.request('OPTIONS', '/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Allow'],
'GET, DELETE, POST, HEAD, OPTIONS')
res = client.request('OPTIONS', '/foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Allow'], 'POST, PUT, OPTIONS')
def test_empty_request(self):
self._mock()

View File

@@ -101,6 +101,48 @@ class TestMicrodotAsync(unittest.TestCase):
self.assertEqual(res.body, b'bar-async')
self.assertEqual(res.json, None)
def test_head_request(self):
app = Microdot()
@app.route('/foo')
def index(req):
return 'foo'
mock_socket.clear_requests()
fd = mock_socket.add_request('HEAD', '/foo')
self._add_shutdown(app)
app.run()
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
self.assertIn(b'Content-Length: 3\r\n', fd.response)
self.assertIn(b'Content-Type: text/plain; charset=UTF-8\r\n',
fd.response)
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
def test_options_request(self):
app = Microdot()
@app.route('/', methods=['GET', 'DELETE'])
async def index(req):
return 'foo'
@app.post('/')
async def index_post(req):
return 'bar'
@app.route('/foo', methods=['POST', 'PUT'])
async def foo(req):
return 'baz'
client = TestClient(app)
res = self._run(client.request('OPTIONS', '/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Allow'],
'GET, DELETE, POST, HEAD, OPTIONS')
res = self._run(client.request('OPTIONS', '/foo'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Allow'], 'POST, PUT, OPTIONS')
def test_empty_request(self):
app = Microdot()

View File

@@ -58,3 +58,11 @@ class TestMultiDict(unittest.TestCase):
del d['oNE']
self.assertEqual(list(d.items()), [('two', 5)])
self.assertEqual(list(d.values()), [5])
d.update({'oNe': 1, 'two': 2, 'three': 3})
self.assertEqual(d['one'], 1)
self.assertEqual(d['ONE'], 1)
self.assertEqual(d['two'], 2)
self.assertEqual(d['TWO'], 2)
self.assertEqual(d['three'], 3)
self.assertEqual(d['THREE'], 3)

View File

@@ -45,6 +45,13 @@ class TestRequest(unittest.TestCase):
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_badly_formatted_args(self):
fd = get_request_fd('GET', '/?&foo=bar&abc=def&&&x=%2f%%')
req = Request.create('app', fd, 'addr')
self.assertEqual(req.query_string, '&foo=bar&abc=def&&&x=%2f%%')
self.assertEqual(req.args, MultiDict(
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
def test_json(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')

View File

@@ -235,6 +235,39 @@ class TestResponse(unittest.TestCase):
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
Response.send_file_buffer_size = original_buffer_size
def test_send_file_max_age(self):
res = Response.send_file('tests/files/test.txt', max_age=123)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Cache-Control'], 'max-age=123')
Response.default_send_file_max_age = 456
res = Response.send_file('tests/files/test.txt')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Cache-Control'], 'max-age=456')
res = Response.send_file('tests/files/test.txt', max_age=123)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Cache-Control'], 'max-age=123')
Response.default_send_file_max_age = None
def test_send_file_compressed(self):
res = Response.send_file('tests/files/test.txt', compressed=True)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/plain')
self.assertEqual(res.headers['Content-Encoding'], 'gzip')
res = Response.send_file('tests/files/test.txt', compressed='foo')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'], 'text/plain')
self.assertEqual(res.headers['Content-Encoding'], 'foo')
res = Response.send_file('tests/files/test', compressed=True,
file_extension='.gz')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'application/octet-stream')
self.assertEqual(res.headers['Content-Encoding'], 'gzip')
def test_default_content_type(self):
original_content_type = Response.default_content_type
res = Response('foo')