Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79e11262d1 | ||
|
|
a1b061656f | ||
|
|
67798f7dbf | ||
|
|
ea6766cea9 | ||
|
|
6a31f89673 | ||
|
|
eaf2ef62d1 | ||
|
|
a350e8fd1e | ||
|
|
daf1001ec5 | ||
|
|
e684ee32d9 | ||
|
|
573e303a98 | ||
|
|
3592f53999 | ||
|
|
ea3722ca5c | ||
|
|
358fe6d2cc | ||
|
|
cb39898829 | ||
|
|
db908fe7c3 | ||
|
|
cb856e1bc7 | ||
|
|
110d7de6a9 |
26
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
26
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
@@ -0,0 +1,26 @@
|
||||
---
|
||||
name: Bug report
|
||||
about: Create a report to help us improve
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**IMPORTANT**: If you have a question, or you are not sure if you have found a bug in this package, then you are in the wrong place. Hit back in your web browser, and then open a GitHub Discussion instead. Likewise, if you are unable to provide the information requested below, open a discussion to troubleshoot your issue.
|
||||
|
||||
**Describe the bug**
|
||||
A clear and concise description of what the bug is. If you are getting errors, please include the complete error message, including the stack trace.
|
||||
|
||||
**To Reproduce**
|
||||
Steps to reproduce the behavior:
|
||||
1. Go to '...'
|
||||
2. Click on '....'
|
||||
3. Scroll down to '....'
|
||||
4. See error
|
||||
|
||||
**Expected behavior**
|
||||
A clear and concise description of what you expected to happen.
|
||||
|
||||
**Additional context**
|
||||
Add any other context about the problem here.
|
||||
5
.github/ISSUE_TEMPLATE/config.yml
vendored
Normal file
5
.github/ISSUE_TEMPLATE/config.yml
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
blank_issues_enabled: false
|
||||
contact_links:
|
||||
- name: GitHub Discussions
|
||||
url: https://github.com/miguelgrinberg/microdot/discussions
|
||||
about: Ask questions here.
|
||||
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
@@ -0,0 +1,20 @@
|
||||
---
|
||||
name: Feature request
|
||||
about: Suggest an idea for this project
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||
|
||||
**Describe the solution you'd like**
|
||||
A clear and concise description of what you want to happen.
|
||||
|
||||
**Describe alternatives you've considered**
|
||||
A clear and concise description of any alternative solutions or features you've considered.
|
||||
|
||||
**Additional context**
|
||||
Add any other context or screenshots about the feature request here.
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -103,3 +103,8 @@ venv.bak/
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
|
||||
# other
|
||||
*.der
|
||||
*.pem
|
||||
*_txt.py
|
||||
|
||||
18
CHANGES.md
18
CHANGES.md
@@ -1,5 +1,23 @@
|
||||
# Microdot change log
|
||||
|
||||
**Release 1.3.0** - 2023-04-08
|
||||
|
||||
- Cross-Origin Resource Sharing (CORS) extension [#45](https://github.com/miguelgrinberg/microdot/issues/45) ([commit](https://github.com/miguelgrinberg/microdot/commit/67798f7dbffb30018ab4b62a9aaa297f63bc9e64))
|
||||
- Respond to `HEAD` and `OPTIONS` requests ([commit](https://github.com/miguelgrinberg/microdot/commit/6a31f89673518e79fef5659c04e609b7976a5e34))
|
||||
- Tolerate slightly invalid formats in query strings [#126](https://github.com/miguelgrinberg/microdot/issues/126) ([commit](https://github.com/miguelgrinberg/microdot/commit/a1b061656fa19dae583951596b0f1f0603652a56))
|
||||
- Support compressed files in `send_file()` [#93](https://github.com/miguelgrinberg/microdot/issues/93) ([commit](https://github.com/miguelgrinberg/microdot/commit/daf1001ec55ab38e6cdfee4931729a3b7506858b))
|
||||
- Add `max_age` argument to `send_file()` ([commit](https://github.com/miguelgrinberg/microdot/commit/e684ee32d91d3e2ab9569bb5fd342986c010ffeb))
|
||||
- Add `update()` method to `NoCaseDict` class ([commit](https://github.com/miguelgrinberg/microdot/commit/ea6766cea96b756b36ed777f9c1b6a6680db09ba))
|
||||
- Set exit code to 1 for failed MicroPython test runs ([commit](https://github.com/miguelgrinberg/microdot/commit/a350e8fd1e55fac12c9e5b909cfa82d880b177ef))
|
||||
|
||||
**Release 1.2.4** - 2023-03-03
|
||||
|
||||
- One more attempt to correct build issues ([commit](https://github.com/miguelgrinberg/microdot/commit/cb39898829f4edc233ab4e7ba3f7ef3c5c50f196))
|
||||
|
||||
**Release 1.2.3** - 2023-03-03
|
||||
|
||||
- Corrected a problem with previous build.
|
||||
|
||||
**Release 1.2.2** - 2023-03-03
|
||||
|
||||
- Add a socket read timeout to abort incomplete requests [#99](https://github.com/miguelgrinberg/microdot/issues/99) ([commit](https://github.com/miguelgrinberg/microdot/commit/d0d358f94a63f8565d6406feff0c6e7418cc7f81))
|
||||
|
||||
@@ -52,6 +52,12 @@ API Reference
|
||||
.. automodule:: microdot_session
|
||||
:members:
|
||||
|
||||
``microdot_cors`` module
|
||||
------------------------
|
||||
|
||||
.. automodule:: microdot_cors
|
||||
:members:
|
||||
|
||||
``microdot_websocket`` module
|
||||
------------------------------
|
||||
|
||||
|
||||
@@ -208,6 +208,42 @@ Example::
|
||||
delete_session(req)
|
||||
return redirect('/')
|
||||
|
||||
Cross-Origin Resource Sharing (CORS)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
|
||||
* - Compatibility
|
||||
- | CPython & MicroPython
|
||||
|
||||
* - Required Microdot source files
|
||||
- | `microdot.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot.py>`_
|
||||
| `microdot_cors.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot_cors.py>`_
|
||||
|
||||
* - Required external dependencies
|
||||
- | None
|
||||
|
||||
* - Examples
|
||||
- | `cors.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/cors/cors.py>`_
|
||||
|
||||
The CORS extension provides support for `Cross-Origin Resource Sharing
|
||||
(CORS) <https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS>`_. CORS is a
|
||||
mechanism that allows web applications running on different origins to access
|
||||
resources from each other. For example, a web application running on
|
||||
``https://example.com`` can access resources from ``https://api.example.com``.
|
||||
|
||||
To enable CORS support, create an instance of the
|
||||
:class:`CORS <microdot_cors.CORS>` class and configure the desired options.
|
||||
Example::
|
||||
|
||||
from microdot import Microdot
|
||||
from microdot_cors import CORS
|
||||
|
||||
app = Microdot()
|
||||
cors = CORS(app, allowed_origins=['https://example.com'],
|
||||
allow_credentials=True)
|
||||
|
||||
WebSocket Support
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
||||
@@ -662,6 +662,15 @@ object for a file::
|
||||
def index(request):
|
||||
return send_file('/static/index.html')
|
||||
|
||||
A suggested caching duration can be returned to the client in the ``max_age``
|
||||
argument::
|
||||
|
||||
from microdot import send_file
|
||||
|
||||
@app.get('/')
|
||||
def image(request):
|
||||
return send_file('/static/image.jpg', max_age=3600) # in seconds
|
||||
|
||||
.. note::
|
||||
Unlike other web frameworks, Microdot does not automatically configure a
|
||||
route to serve static files. The following is an example route that can be
|
||||
@@ -673,7 +682,7 @@ object for a file::
|
||||
if '..' in path:
|
||||
# directory traversal is not allowed
|
||||
return 'Not found', 404
|
||||
return send_file('static/' + path)
|
||||
return send_file('static/' + path, max_age=86400)
|
||||
|
||||
Streaming Responses
|
||||
^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
1
examples/cors/README.md
Normal file
1
examples/cors/README.md
Normal file
@@ -0,0 +1 @@
|
||||
This directory contains Cross-Origin Resource Sharing (CORS) examples.
|
||||
14
examples/cors/app.py
Normal file
14
examples/cors/app.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from microdot import Microdot
|
||||
from microdot_cors import CORS
|
||||
|
||||
app = Microdot()
|
||||
CORS(app, allowed_origins=['https://example.org'], allow_credentials=True)
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def index(request):
|
||||
return 'Hello World!'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run()
|
||||
20
examples/static/gzstatic.py
Normal file
20
examples/static/gzstatic.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from microdot import Microdot, send_file
|
||||
|
||||
app = Microdot()
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def index(request):
|
||||
return send_file('gzstatic/index.html', compressed=True,
|
||||
file_extension='.gz')
|
||||
|
||||
|
||||
@app.route('/static/<path:path>')
|
||||
def static(request, path):
|
||||
if '..' in path:
|
||||
# directory traversal is not allowed
|
||||
return 'Not found', 404
|
||||
return send_file('gzstatic/' + path, compressed=True, file_extension='.gz')
|
||||
|
||||
|
||||
app.run(debug=True)
|
||||
BIN
examples/static/gzstatic/index.html.gz
Normal file
BIN
examples/static/gzstatic/index.html.gz
Normal file
Binary file not shown.
BIN
examples/static/gzstatic/logo.png.gz
Normal file
BIN
examples/static/gzstatic/logo.png.gz
Normal file
Binary file not shown.
@@ -6,4 +6,5 @@ sys.path.insert(3, 'libs/micropython')
|
||||
|
||||
import unittest
|
||||
|
||||
unittest.main('tests')
|
||||
if not unittest.main('tests').wasSuccessful():
|
||||
sys.exit(1)
|
||||
|
||||
35
setup.cfg
35
setup.cfg
@@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = microdot
|
||||
version = 1.2.2
|
||||
version = 1.3.0
|
||||
author = Miguel Grinberg
|
||||
author_email = miguel.grinberg@gmail.com
|
||||
description = The impossibly small web framework for MicroPython
|
||||
@@ -22,22 +22,17 @@ zip_safe = False
|
||||
include_package_data = True
|
||||
package_dir =
|
||||
= src
|
||||
packages = find:
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
|
||||
#py_modules =
|
||||
# microdot
|
||||
# microdot_asyncio
|
||||
# microdot_utemplate
|
||||
# microdot_jinja
|
||||
# microdot_session
|
||||
# microdot_websocket
|
||||
# microdot_websocket_alt
|
||||
# microdot_asyncio_websocket
|
||||
# microdot_test_client
|
||||
# microdot_asyncio_test_client
|
||||
# microdot_wsgi
|
||||
# microdot_asgi
|
||||
# microdot_asgi_websocket
|
||||
py_modules =
|
||||
microdot
|
||||
microdot_asyncio
|
||||
microdot_utemplate
|
||||
microdot_jinja
|
||||
microdot_session
|
||||
microdot_websocket
|
||||
microdot_websocket_alt
|
||||
microdot_asyncio_websocket
|
||||
microdot_test_client
|
||||
microdot_asyncio_test_client
|
||||
microdot_wsgi
|
||||
microdot_asgi
|
||||
microdot_asgi_websocket
|
||||
|
||||
104
src/microdot.py
104
src/microdot.py
@@ -146,6 +146,10 @@ class NoCaseDict(dict):
|
||||
kl = key.lower()
|
||||
return super().get(self.keymap.get(kl, kl), default)
|
||||
|
||||
def update(self, other_dict):
|
||||
for key, value in other_dict.items():
|
||||
self[key] = value
|
||||
|
||||
|
||||
def mro(cls): # pragma: no cover
|
||||
"""Return the method resolution order of a class.
|
||||
@@ -400,11 +404,11 @@ class Request():
|
||||
if len(urlencoded) > 0:
|
||||
if isinstance(urlencoded, str):
|
||||
for k, v in [pair.split('=', 1)
|
||||
for pair in urlencoded.split('&')]:
|
||||
for pair in urlencoded.split('&') if pair]:
|
||||
data[urldecode_str(k)] = urldecode_str(v)
|
||||
elif isinstance(urlencoded, bytes): # pragma: no branch
|
||||
for k, v in [pair.split(b'=', 1)
|
||||
for pair in urlencoded.split(b'&')]:
|
||||
for pair in urlencoded.split(b'&') if pair]:
|
||||
data[urldecode_bytes(k)] = urldecode_bytes(v)
|
||||
return data
|
||||
|
||||
@@ -525,6 +529,10 @@ class Response():
|
||||
#: ``Content-Type`` header.
|
||||
default_content_type = 'text/plain'
|
||||
|
||||
#: The default cache control max age used by :meth:`send_file`. A value
|
||||
#: of ``None`` means that no ``Cache-Control`` header is added.
|
||||
default_send_file_max_age = None
|
||||
|
||||
#: Special response used to signal that a response does not need to be
|
||||
#: written to the client. Used to exit WebSocket connections cleanly.
|
||||
already_handled = None
|
||||
@@ -544,6 +552,7 @@ class Response():
|
||||
else:
|
||||
# this applies to bytes, file-like objects or generators
|
||||
self.body = body
|
||||
self.is_head = False
|
||||
|
||||
def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
|
||||
max_age=None, secure=False, http_only=False):
|
||||
@@ -608,19 +617,20 @@ class Response():
|
||||
stream.write(b'\r\n')
|
||||
|
||||
# body
|
||||
can_flush = hasattr(stream, 'flush')
|
||||
try:
|
||||
for body in self.body_iter():
|
||||
if isinstance(body, str): # pragma: no cover
|
||||
body = body.encode()
|
||||
stream.write(body)
|
||||
if can_flush: # pragma: no cover
|
||||
stream.flush()
|
||||
except OSError as exc: # pragma: no cover
|
||||
if exc.errno in MUTED_SOCKET_ERRORS:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
if not self.is_head:
|
||||
can_flush = hasattr(stream, 'flush')
|
||||
try:
|
||||
for body in self.body_iter():
|
||||
if isinstance(body, str): # pragma: no cover
|
||||
body = body.encode()
|
||||
stream.write(body)
|
||||
if can_flush: # pragma: no cover
|
||||
stream.flush()
|
||||
except OSError as exc: # pragma: no cover
|
||||
if exc.errno in MUTED_SOCKET_ERRORS:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
def body_iter(self):
|
||||
if self.body:
|
||||
@@ -651,7 +661,9 @@ class Response():
|
||||
return cls(status_code=status_code, headers={'Location': location})
|
||||
|
||||
@classmethod
|
||||
def send_file(cls, filename, status_code=200, content_type=None):
|
||||
def send_file(cls, filename, status_code=200, content_type=None,
|
||||
stream=None, max_age=None, compressed=False,
|
||||
file_extension=''):
|
||||
"""Send file contents in a response.
|
||||
|
||||
:param filename: The filename of the file.
|
||||
@@ -659,7 +671,25 @@ class Response():
|
||||
default is 302.
|
||||
:param content_type: The ``Content-Type`` header to use in the
|
||||
response. If omitted, it is generated
|
||||
automatically from the file extension.
|
||||
automatically from the file extension of the
|
||||
``filename`` parameter.
|
||||
:param stream: A file-like object to read the file contents from. If
|
||||
a stream is given, the ``filename`` parameter is only
|
||||
used when generating the ``Content-Type`` header.
|
||||
:param max_age: The ``Cache-Control`` header's ``max-age`` value in
|
||||
seconds. If omitted, the value of the
|
||||
:attr:`Response.default_send_file_max_age` attribute is
|
||||
used.
|
||||
:param compressed: Whether the file is compressed. If ``True``, the
|
||||
``Content-Encoding`` header is set to ``gzip``. A
|
||||
string with the header value can also be passed.
|
||||
Note that when using this option the file must have
|
||||
been compressed beforehand. This option only sets
|
||||
the header.
|
||||
:param file_extension: A file extension to append to the ``filename``
|
||||
parameter when opening the file, including the
|
||||
dot. The extension given here is not considered
|
||||
when generating the ``Content-Type`` header.
|
||||
|
||||
Security note: The filename is assumed to be trusted. Never pass
|
||||
filenames provided by the user without validating and sanitizing them
|
||||
@@ -671,9 +701,19 @@ class Response():
|
||||
content_type = Response.types_map[ext]
|
||||
else:
|
||||
content_type = 'application/octet-stream'
|
||||
f = open(filename, 'rb')
|
||||
return cls(body=f, status_code=status_code,
|
||||
headers={'Content-Type': content_type})
|
||||
headers = {'Content-Type': content_type}
|
||||
|
||||
if max_age is None:
|
||||
max_age = cls.default_send_file_max_age
|
||||
if max_age is not None:
|
||||
headers['Cache-Control'] = 'max-age={}'.format(max_age)
|
||||
|
||||
if compressed:
|
||||
headers['Content-Encoding'] = compressed \
|
||||
if isinstance(compressed, str) else 'gzip'
|
||||
|
||||
f = stream or open(filename + file_extension, 'rb')
|
||||
return cls(body=f, status_code=status_code, headers=headers)
|
||||
|
||||
|
||||
class URLPattern():
|
||||
@@ -759,6 +799,7 @@ class Microdot():
|
||||
self.after_error_request_handlers = []
|
||||
self.error_handlers = {}
|
||||
self.shutdown_requested = False
|
||||
self.options_handler = self.default_options_handler
|
||||
self.debug = False
|
||||
self.server = None
|
||||
|
||||
@@ -794,7 +835,8 @@ class Microdot():
|
||||
"""
|
||||
def decorated(f):
|
||||
self.url_map.append(
|
||||
(methods or ['GET'], URLPattern(url_pattern), f))
|
||||
([m.upper() for m in (methods or ['GET'])],
|
||||
URLPattern(url_pattern), f))
|
||||
return f
|
||||
return decorated
|
||||
|
||||
@@ -1080,17 +1122,32 @@ class Microdot():
|
||||
self.shutdown_requested = True
|
||||
|
||||
def find_route(self, req):
|
||||
method = req.method.upper()
|
||||
if method == 'OPTIONS' and self.options_handler:
|
||||
return self.options_handler(req)
|
||||
if method == 'HEAD':
|
||||
method = 'GET'
|
||||
f = 404
|
||||
for route_methods, route_pattern, route_handler in self.url_map:
|
||||
req.url_args = route_pattern.match(req.path)
|
||||
if req.url_args is not None:
|
||||
if req.method in route_methods:
|
||||
if method in route_methods:
|
||||
f = route_handler
|
||||
break
|
||||
else:
|
||||
f = 405
|
||||
return f
|
||||
|
||||
def default_options_handler(self, req):
|
||||
allow = []
|
||||
for route_methods, route_pattern, route_handler in self.url_map:
|
||||
if route_pattern.match(req.path) is not None:
|
||||
allow.extend(route_methods)
|
||||
if 'GET' in allow:
|
||||
allow.append('HEAD')
|
||||
allow.append('OPTIONS')
|
||||
return {'Allow': ', '.join(allow)}
|
||||
|
||||
def handle_request(self, sock, addr):
|
||||
if Request.socket_read_timeout and \
|
||||
hasattr(sock, 'settimeout'): # pragma: no cover
|
||||
@@ -1165,6 +1222,8 @@ class Microdot():
|
||||
for handler in req.after_request_handlers:
|
||||
res = handler(req, res) or res
|
||||
after_request_handled = True
|
||||
elif isinstance(f, dict):
|
||||
res = Response(headers=f)
|
||||
elif f in self.error_handlers:
|
||||
res = self.error_handlers[f](req)
|
||||
else:
|
||||
@@ -1208,6 +1267,7 @@ class Microdot():
|
||||
if not after_request_handled:
|
||||
for handler in self.after_error_request_handlers:
|
||||
res = handler(req, res) or res
|
||||
res.is_head = (req and req.method == 'HEAD')
|
||||
return res
|
||||
|
||||
|
||||
|
||||
@@ -151,10 +151,11 @@ class Response(BaseResponse):
|
||||
await stream.awrite(b'\r\n')
|
||||
|
||||
# body
|
||||
async for body in self.body_iter():
|
||||
if isinstance(body, str): # pragma: no cover
|
||||
body = body.encode()
|
||||
await stream.awrite(body)
|
||||
if not self.is_head:
|
||||
async for body in self.body_iter():
|
||||
if isinstance(body, str): # pragma: no cover
|
||||
body = body.encode()
|
||||
await stream.awrite(body)
|
||||
except OSError as exc: # pragma: no cover
|
||||
if exc.errno in MUTED_SOCKET_ERRORS or \
|
||||
exc.args[0] == 'Connection lost':
|
||||
@@ -385,6 +386,8 @@ class Microdot(BaseMicrodot):
|
||||
res = await self._invoke_handler(
|
||||
handler, req, res) or res
|
||||
after_request_handled = True
|
||||
elif isinstance(f, dict):
|
||||
res = Response(headers=f)
|
||||
elif f in self.error_handlers:
|
||||
res = await self._invoke_handler(
|
||||
self.error_handlers[f], req)
|
||||
@@ -431,6 +434,7 @@ class Microdot(BaseMicrodot):
|
||||
for handler in self.after_error_request_handlers:
|
||||
res = await self._invoke_handler(
|
||||
handler, req, res) or res
|
||||
res.is_head = (req and req.method == 'HEAD')
|
||||
return res
|
||||
|
||||
async def _invoke_handler(self, f_or_coro, *args, **kwargs):
|
||||
|
||||
110
src/microdot_cors.py
Normal file
110
src/microdot_cors.py
Normal file
@@ -0,0 +1,110 @@
|
||||
class CORS:
|
||||
"""Add CORS headers to HTTP responses.
|
||||
|
||||
:param app: The application to add CORS headers to.
|
||||
:param allowed_origins: A list of origins that are allowed to make
|
||||
cross-site requests. If set to '*', all origins are
|
||||
allowed.
|
||||
:param allow_credentials: If set to True, the
|
||||
``Access-Control-Allow-Credentials`` header will
|
||||
be set to ``true`` to indicate to the browser
|
||||
that it can expose cookies and authentication
|
||||
headers.
|
||||
:param allowed_methods: A list of methods that are allowed to be used when
|
||||
making cross-site requests. If not set, all methods
|
||||
are allowed.
|
||||
:param expose_headers: A list of headers that the browser is allowed to
|
||||
exposed.
|
||||
:param allowed_headers: A list of headers that are allowed to be used when
|
||||
making cross-site requests. If not set, all headers
|
||||
are allowed.
|
||||
:param max_age: The maximum amount of time in seconds that the browser
|
||||
should cache the results of a preflight request.
|
||||
:param handle_cors: If set to False, CORS headers will not be added to
|
||||
responses. This can be useful if you want to add CORS
|
||||
headers manually.
|
||||
"""
|
||||
def __init__(self, app=None, allowed_origins=None, allow_credentials=False,
|
||||
allowed_methods=None, expose_headers=None,
|
||||
allowed_headers=None, max_age=None, handle_cors=True):
|
||||
self.allowed_origins = allowed_origins
|
||||
self.allow_credentials = allow_credentials
|
||||
self.allowed_methods = allowed_methods
|
||||
self.expose_headers = expose_headers
|
||||
self.allowed_headers = None if allowed_headers is None \
|
||||
else [h.lower() for h in allowed_headers]
|
||||
self.max_age = max_age
|
||||
if app is not None:
|
||||
self.initialize(app, handle_cors=handle_cors)
|
||||
|
||||
def initialize(self, app, handle_cors=True):
|
||||
"""Initialize the CORS object for the given application.
|
||||
|
||||
:param app: The application to add CORS headers to.
|
||||
:param handle_cors: If set to False, CORS headers will not be added to
|
||||
responses. This can be useful if you want to add
|
||||
CORS headers manually.
|
||||
"""
|
||||
self.default_options_handler = app.options_handler
|
||||
if handle_cors:
|
||||
app.options_handler = self.options_handler
|
||||
app.after_request(self.after_request)
|
||||
app.after_error_request(self.after_request)
|
||||
|
||||
def options_handler(self, request):
|
||||
headers = self.default_options_handler(request)
|
||||
headers.update(self.get_cors_headers(request))
|
||||
return headers
|
||||
|
||||
def get_cors_headers(self, request):
|
||||
"""Return a dictionary of CORS headers to add to a given request.
|
||||
|
||||
:param request: The request to add CORS headers to.
|
||||
"""
|
||||
cors_headers = {}
|
||||
origin = request.headers.get('Origin')
|
||||
if self.allowed_origins == '*':
|
||||
cors_headers['Access-Control-Allow-Origin'] = origin or '*'
|
||||
if origin:
|
||||
cors_headers['Vary'] = 'Origin'
|
||||
elif origin in (self.allowed_origins or []):
|
||||
cors_headers['Access-Control-Allow-Origin'] = origin
|
||||
cors_headers['Vary'] = 'Origin'
|
||||
if self.allow_credentials and \
|
||||
'Access-Control-Allow-Origin' in cors_headers:
|
||||
cors_headers['Access-Control-Allow-Credentials'] = 'true'
|
||||
if self.expose_headers:
|
||||
cors_headers['Access-Control-Expose-Headers'] = \
|
||||
', '.join(self.expose_headers)
|
||||
|
||||
if request.method == 'OPTIONS':
|
||||
# handle preflight request
|
||||
if self.max_age:
|
||||
cors_headers['Access-Control-Max-Age'] = str(self.max_age)
|
||||
|
||||
method = request.headers.get('Access-Control-Request-Method')
|
||||
if method:
|
||||
method = method.upper()
|
||||
if self.allowed_methods is None or \
|
||||
method in self.allowed_methods:
|
||||
cors_headers['Access-Control-Allow-Methods'] = method
|
||||
|
||||
headers = request.headers.get('Access-Control-Request-Headers')
|
||||
if headers:
|
||||
if self.allowed_headers is None:
|
||||
cors_headers['Access-Control-Allow-Headers'] = headers
|
||||
else:
|
||||
headers = [h.strip() for h in headers.split(',')]
|
||||
headers = [h for h in headers
|
||||
if h.lower() in self.allowed_headers]
|
||||
cors_headers['Access-Control-Allow-Headers'] = \
|
||||
', '.join(headers)
|
||||
|
||||
return cors_headers
|
||||
|
||||
def after_request(self, request, response):
|
||||
saved_vary = response.headers.get('Vary')
|
||||
response.headers.update(self.get_cors_headers(request))
|
||||
if saved_vary and saved_vary != response.headers.get('Vary'):
|
||||
response.headers['Vary'] = (
|
||||
saved_vary + ', ' + response.headers['Vary'])
|
||||
@@ -58,6 +58,7 @@ class TestResponse:
|
||||
test_res._initialize_body(res)
|
||||
test_res._process_text_body()
|
||||
test_res._process_json_body()
|
||||
test_res.is_head = res.is_head
|
||||
return test_res
|
||||
|
||||
|
||||
|
||||
BIN
tests/files/test.gz
Normal file
BIN
tests/files/test.gz
Normal file
Binary file not shown.
158
tests/test_cors.py
Normal file
158
tests/test_cors.py
Normal file
@@ -0,0 +1,158 @@
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from microdot_test_client import TestClient
|
||||
from microdot_cors import CORS
|
||||
|
||||
|
||||
class TestCORS(unittest.TestCase):
|
||||
def test_origin(self):
|
||||
app = Microdot()
|
||||
cors = CORS(allowed_origins=['https://example.com'],
|
||||
allow_credentials=True)
|
||||
cors.initialize(app)
|
||||
|
||||
@app.get('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.get('/')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
|
||||
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
|
||||
self.assertFalse('Vary' in res.headers)
|
||||
|
||||
res = client.get('/', headers={'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Credentials'],
|
||||
'true')
|
||||
self.assertEqual(res.headers['Vary'], 'Origin')
|
||||
|
||||
cors.allow_credentials = False
|
||||
|
||||
res = client.get('/foo', headers={'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 404)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
|
||||
self.assertEqual(res.headers['Vary'], 'Origin')
|
||||
|
||||
res = client.get('/', headers={'Origin': 'https://bad.com'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
|
||||
self.assertFalse('Access-Control-Allow-Credentials' in res.headers)
|
||||
self.assertFalse('Vary' in res.headers)
|
||||
|
||||
def test_all_origins(self):
|
||||
app = Microdot()
|
||||
CORS(app, allowed_origins='*', expose_headers=['X-Test', 'X-Test2'])
|
||||
|
||||
@app.get('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.get('/foo')
|
||||
def foo(req):
|
||||
return 'foo', {'Vary': 'X-Foo, X-Bar'}
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.get('/')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'], '*')
|
||||
self.assertFalse('Vary' in res.headers)
|
||||
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
|
||||
'X-Test, X-Test2')
|
||||
|
||||
res = client.get('/', headers={'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertEqual(res.headers['Vary'], 'Origin')
|
||||
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
|
||||
'X-Test, X-Test2')
|
||||
|
||||
res = client.get('/bad', headers={'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 404)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertEqual(res.headers['Vary'], 'Origin')
|
||||
self.assertEqual(res.headers['Access-Control-Expose-Headers'],
|
||||
'X-Test, X-Test2')
|
||||
|
||||
res = client.get('/foo', headers={'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Vary'], 'X-Foo, X-Bar, Origin')
|
||||
|
||||
def test_cors_preflight(self):
|
||||
app = Microdot()
|
||||
CORS(app, allowed_origins='*')
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.request('OPTIONS', '/', headers={
|
||||
'Origin': 'https://example.com',
|
||||
'Access-Control-Request-Method': 'POST',
|
||||
'Access-Control-Request-Headers': 'X-Test, X-Test2'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertFalse('Access-Control-Max-Age' in res.headers)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Methods'], 'POST')
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Headers'],
|
||||
'X-Test, X-Test2')
|
||||
|
||||
res = client.request('OPTIONS', '/', headers={
|
||||
'Origin': 'https://example.com'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertFalse('Access-Control-Max-Age' in res.headers)
|
||||
self.assertFalse('Access-Control-Allow-Methods' in res.headers)
|
||||
self.assertFalse('Access-Control-Allow-Headers' in res.headers)
|
||||
|
||||
def test_cors_preflight_with_options(self):
|
||||
app = Microdot()
|
||||
CORS(app, allowed_origins='*', max_age=3600, allowed_methods=['POST'],
|
||||
allowed_headers=['X-Test'])
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.request('OPTIONS', '/', headers={
|
||||
'Origin': 'https://example.com',
|
||||
'Access-Control-Request-Method': 'POST',
|
||||
'Access-Control-Request-Headers': 'X-Test, X-Test2'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Origin'],
|
||||
'https://example.com')
|
||||
self.assertEqual(res.headers['Access-Control-Max-Age'], '3600')
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Methods'], 'POST')
|
||||
self.assertEqual(res.headers['Access-Control-Allow-Headers'], 'X-Test')
|
||||
|
||||
res = client.request('OPTIONS', '/', headers={
|
||||
'Origin': 'https://example.com',
|
||||
'Access-Control-Request-Method': 'GET'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('Access-Control-Allow-Methods' in res.headers)
|
||||
self.assertFalse('Access-Control-Allow-Headers' in res.headers)
|
||||
|
||||
def test_cors_disabled(self):
|
||||
app = Microdot()
|
||||
CORS(app, allowed_origins='*', handle_cors=False)
|
||||
|
||||
@app.get('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.get('/')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('Access-Control-Allow-Origin' in res.headers)
|
||||
self.assertFalse('Vary' in res.headers)
|
||||
@@ -63,6 +63,52 @@ class TestMicrodot(unittest.TestCase):
|
||||
self.assertEqual(res.headers['Content-Length'], '3')
|
||||
self.assertEqual(res.text, 'bar')
|
||||
|
||||
def test_head_request(self):
|
||||
self._mock()
|
||||
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/foo')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('HEAD', '/foo')
|
||||
self._add_shutdown(app)
|
||||
app.run()
|
||||
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain; charset=UTF-8\r\n',
|
||||
fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
|
||||
|
||||
self._unmock()
|
||||
|
||||
def test_options_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/', methods=['GET', 'DELETE'])
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.post('/')
|
||||
def index_post(req):
|
||||
return 'bar'
|
||||
|
||||
@app.route('/foo', methods=['POST', 'PUT'])
|
||||
def foo(req):
|
||||
return 'baz'
|
||||
|
||||
client = TestClient(app)
|
||||
res = client.request('OPTIONS', '/')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Allow'],
|
||||
'GET, DELETE, POST, HEAD, OPTIONS')
|
||||
res = client.request('OPTIONS', '/foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Allow'], 'POST, PUT, OPTIONS')
|
||||
|
||||
def test_empty_request(self):
|
||||
self._mock()
|
||||
|
||||
|
||||
@@ -101,6 +101,48 @@ class TestMicrodotAsync(unittest.TestCase):
|
||||
self.assertEqual(res.body, b'bar-async')
|
||||
self.assertEqual(res.json, None)
|
||||
|
||||
def test_head_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/foo')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('HEAD', '/foo')
|
||||
self._add_shutdown(app)
|
||||
app.run()
|
||||
|
||||
self.assertTrue(fd.response.startswith(b'HTTP/1.0 200 OK\r\n'))
|
||||
self.assertIn(b'Content-Length: 3\r\n', fd.response)
|
||||
self.assertIn(b'Content-Type: text/plain; charset=UTF-8\r\n',
|
||||
fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\n'))
|
||||
|
||||
def test_options_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/', methods=['GET', 'DELETE'])
|
||||
async def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.post('/')
|
||||
async def index_post(req):
|
||||
return 'bar'
|
||||
|
||||
@app.route('/foo', methods=['POST', 'PUT'])
|
||||
async def foo(req):
|
||||
return 'baz'
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.request('OPTIONS', '/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Allow'],
|
||||
'GET, DELETE, POST, HEAD, OPTIONS')
|
||||
res = self._run(client.request('OPTIONS', '/foo'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Allow'], 'POST, PUT, OPTIONS')
|
||||
|
||||
def test_empty_request(self):
|
||||
app = Microdot()
|
||||
|
||||
|
||||
@@ -58,3 +58,11 @@ class TestMultiDict(unittest.TestCase):
|
||||
del d['oNE']
|
||||
self.assertEqual(list(d.items()), [('two', 5)])
|
||||
self.assertEqual(list(d.values()), [5])
|
||||
|
||||
d.update({'oNe': 1, 'two': 2, 'three': 3})
|
||||
self.assertEqual(d['one'], 1)
|
||||
self.assertEqual(d['ONE'], 1)
|
||||
self.assertEqual(d['two'], 2)
|
||||
self.assertEqual(d['TWO'], 2)
|
||||
self.assertEqual(d['three'], 3)
|
||||
self.assertEqual(d['THREE'], 3)
|
||||
|
||||
@@ -45,6 +45,13 @@ class TestRequest(unittest.TestCase):
|
||||
self.assertEqual(req.args, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
|
||||
def test_badly_formatted_args(self):
|
||||
fd = get_request_fd('GET', '/?&foo=bar&abc=def&&&x=%2f%%')
|
||||
req = Request.create('app', fd, 'addr')
|
||||
self.assertEqual(req.query_string, '&foo=bar&abc=def&&&x=%2f%%')
|
||||
self.assertEqual(req.args, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
|
||||
def test_json(self):
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
|
||||
|
||||
@@ -235,6 +235,39 @@ class TestResponse(unittest.TestCase):
|
||||
b'HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\nfoo\n')
|
||||
Response.send_file_buffer_size = original_buffer_size
|
||||
|
||||
def test_send_file_max_age(self):
|
||||
res = Response.send_file('tests/files/test.txt', max_age=123)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Cache-Control'], 'max-age=123')
|
||||
|
||||
Response.default_send_file_max_age = 456
|
||||
res = Response.send_file('tests/files/test.txt')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Cache-Control'], 'max-age=456')
|
||||
res = Response.send_file('tests/files/test.txt', max_age=123)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Cache-Control'], 'max-age=123')
|
||||
|
||||
Response.default_send_file_max_age = None
|
||||
|
||||
def test_send_file_compressed(self):
|
||||
res = Response.send_file('tests/files/test.txt', compressed=True)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'], 'text/plain')
|
||||
self.assertEqual(res.headers['Content-Encoding'], 'gzip')
|
||||
|
||||
res = Response.send_file('tests/files/test.txt', compressed='foo')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'], 'text/plain')
|
||||
self.assertEqual(res.headers['Content-Encoding'], 'foo')
|
||||
|
||||
res = Response.send_file('tests/files/test', compressed=True,
|
||||
file_extension='.gz')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'],
|
||||
'application/octet-stream')
|
||||
self.assertEqual(res.headers['Content-Encoding'], 'gzip')
|
||||
|
||||
def test_default_content_type(self):
|
||||
original_content_type = Response.default_content_type
|
||||
res = Response('foo')
|
||||
|
||||
Reference in New Issue
Block a user