11 Commits

Author SHA1 Message Date
Miguel Grinberg
e69c2dc42f Release 1.2.1 2022-12-06 12:37:51 +00:00
Miguel Grinberg
5a589afd5e Addressed error when deleting a user session in async app (Fixes #86) 2022-12-06 12:01:16 +00:00
Miguel Grinberg
c841cbedda Add asyncio file upload example 2022-11-16 19:10:44 +00:00
Diego Pomares
24d74fb848 Error handling invokes parent exceptions (Fixes #74) 2022-11-08 00:27:11 +00:00
Diego Pomares
4a9b92b800 Fix typos in documentation (#77) 2022-10-21 10:35:22 +01:00
Diego Pomares
c443599089 Add missing exception argument to error handler example in documentation (#73) 2022-10-15 12:44:52 +01:00
Miguel Grinberg
6554f29ddc Remove unused file #nolog 2022-10-08 12:26:18 +01:00
Miguel Grinberg
211ad953ae New Jinja and uTemplate examples with Bootstrap 2022-10-08 12:21:00 +01:00
Miguel Grinberg
63f43e1e7e Version 1.2.1.dev0 2022-09-25 12:19:46 +01:00
Miguel Grinberg
cb2a23285e Release 1.2.0 2022-09-25 12:19:31 +01:00
Miguel Grinberg
b133dcc343 URL encode/decode unit tests 2022-09-24 20:15:22 +01:00
33 changed files with 546 additions and 530 deletions

View File

@@ -1,5 +1,24 @@
# Microdot change log
**Release 1.2.1** - 2022-12-06
- Error handling invokes parent exceptions [#74](https://github.com/miguelgrinberg/microdot/issues/74) ([commit](https://github.com/miguelgrinberg/microdot/commit/24d74fb8483b04e8abe6e303e06f0a310f32700b)) (thanks **Diego Pomares**!)
- Addressed error when deleting a user session in async app [#86](https://github.com/miguelgrinberg/microdot/issues/86) ([commit](https://github.com/miguelgrinberg/microdot/commit/5a589afd5e519e94e84fc1ee69033f2dad51c3ea))
- Add asyncio file upload example ([commit](https://github.com/miguelgrinberg/microdot/commit/c841cbedda40f59a9d87f6895fdf9fd954f854a2))
- New Jinja and uTemplate examples with Bootstrap ([commit](https://github.com/miguelgrinberg/microdot/commit/211ad953aeedb4c7f73fe210424aa173b4dc7fee))
- Fix typos in documentation [#77](https://github.com/miguelgrinberg/microdot/issues/77) ([commit](https://github.com/miguelgrinberg/microdot/commit/4a9b92b800d3fd87110f7bc9f546c10185ee13bc)) (thanks **Diego Pomares**!)
- Add missing exception argument to error handler example in documentation [#73](https://github.com/miguelgrinberg/microdot/issues/73) ([commit](https://github.com/miguelgrinberg/microdot/commit/c443599089f2127d1cb052dfba8a05c1969d65e3)) (thanks **Diego Pomares**!)
**Release 1.2.0** - 2022-09-25
- Use a case insensitive dict for headers ([commit #1](https://github.com/miguelgrinberg/microdot/commit/b0fd6c432371ca5cb10d07ff84c4deed7aa0ce2e) [commit #2](https://github.com/miguelgrinberg/microdot/commit/a8515c97b030f942fa6ca85cbe1772291468fb0d))
- urlencode() helper function ([commit #1](https://github.com/miguelgrinberg/microdot/commit/672512e086384e808489305502e6ebebcc5a888f) [commit #2](https://github.com/miguelgrinberg/microdot/commit/b133dcc34368853ee685396a1bcb50360e807813))
- Added `request.url` attribute with the complete URL of the request ([commit](https://github.com/miguelgrinberg/microdot/commit/1547e861ee28d43d10fe4c4ed1871345d4b81086))
- Do not log HTTPException occurrences ([commit](https://github.com/miguelgrinberg/microdot/commit/cbefb6bf3a3fdcff8b7a8bacad3449be18e46e3b))
- Cache user session for performance ([commit](https://github.com/miguelgrinberg/microdot/commit/01947b101ebe198312c88d73872e3248024918f0))
- File upload example ([commit](https://github.com/miguelgrinberg/microdot/commit/8ebe81c09b604ddc1123e78ad6bc87ceda5f8597))
- Minor documentation styling fixes ([commit](https://github.com/miguelgrinberg/microdot/commit/4f263c63ab7bb1ce0dd48d8e00f3c6891e1bf07e))
**Release 1.1.1** - 2022-09-18
- Make WebSocket internals consistent between TLS and non-TLS [#61](https://github.com/miguelgrinberg/microdot/issues/61) ([commit](https://github.com/miguelgrinberg/microdot/commit/5693b812ceb2c0d51ec3c991adf6894a87e6fcc7))

View File

@@ -244,7 +244,7 @@ Example::
ws.send(message)
.. note::
An unsupported *microsoft_websocket_alt.py* module, with the same
An unsupported *microdot_websocket_alt.py* module, with the same
interface, is also provided. This module uses the native WebSocket support
in MicroPython that powers the WebREPL, and may provide slightly better
performance for MicroPython low-end boards. This module is not compatible
@@ -276,9 +276,9 @@ This extension has the same interface as the synchronous WebSocket extension,
but the ``receive()`` and ``send()`` methods are asynchronous.
.. note::
An unsupported *microsoft_asgi_websocket.py* module, with the same
An unsupported *microdot_asgi_websocket.py* module, with the same
interface, is also provided. This module must be used instead of
*microsoft_asyncio_websocket.py* when the ASGI support is used. The
*microdot_asyncio_websocket.py* when the ASGI support is used. The
`echo_asgi.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/websocket/echo_asgi.py>`_
example shows how to use this module.

View File

@@ -314,7 +314,7 @@ automatically handled by Microdot are:
While the above errors are fully complaint with the HTTP specification, the
application might want to provide custom responses for them. The
:func:`errorhandler() <microdot.Microdot.errorhandler>` decorator registers
a functions to respond to specific error codes. The following example shows a
functions to respond to specific error codes. The following example shows a
custom error handler for 404 errors::
@app.errorhandler(404)
@@ -322,14 +322,18 @@ custom error handler for 404 errors::
return {'error': 'resource not found'}, 404
The ``errorhandler()`` decorator has a second form, in which it takes an
exception class as an argument. Microdot will then invoke the handler when an
exception of that class is raised. The next example provides a custom response
for division by zero errors::
exception class as an argument. Microdot will then invoke the handler when the
exception is an instance of the given class is raised. The next example
provides a custom response for division by zero errors::
@app.errorhandler(ZeroDivisionError)
def division_by_zero(request):
def division_by_zero(request, exception):
return {'error': 'division by zero'}, 500
When the raised exception class does not have an error handler defined, but
one or more of its base classes do, Microdot makes an attempt to invoke the
most specific handler.
Mounting a Sub-Application
^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -481,7 +485,7 @@ Accessing the Raw Request Body
For cases in which neither JSON nor form data is expected, the
:attr:`body <microdot.Request.body>` request attribute returns the entire body
of the request as a byte sequence.
of the request as a byte sequence.
If the expected body is too large to fit in memory, the application can use the
:attr:`stream <microdot.Request.stream>` request attribute to read the body
@@ -645,9 +649,9 @@ File Responses
The :func:`send_file <microdot.Response.send_file>` function builds a response
object for a file::
from microdot import send_file
@app.get('/')
def index(request):
return send_file('/static/index.html')

View File

@@ -1,27 +0,0 @@
from microdot import Microdot
from microdot_auth import BasicAuth
app = Microdot()
basic_auth = BasicAuth()
USERS = {
'susan': 'hello',
'david': 'bye',
}
@basic_auth.callback
def verify_password(request, username, password):
if username in USERS and USERS[username] == password:
request.g.user = username
return True
@app.route('/')
@basic_auth
def index(request):
return f'Hello, {request.g.user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,60 +0,0 @@
from microdot import Microdot, redirect
from microdot_session import set_session_secret_key
from microdot_login import LoginAuth
app = Microdot()
set_session_secret_key('top-secret')
login_auth = LoginAuth()
USERS = {
'susan': 'hello',
'david': 'bye',
}
@login_auth.callback
def check_user(request, user_id):
request.g.user = user_id
return True
@app.route('/')
@login_auth
def index(request):
return f'''
<h1>Login Auth Example</h1>
<p>Hello, {request.g.user}!</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
''', {'Content-Type': 'text/html'}
@app.route('/login', methods=['GET', 'POST'])
def login(request):
if request.method == 'GET':
return '''
<h1>Login Auth Example</h1>
<form method="POST">
<input name="username" placeholder="username">
<input name="password" type="password" placeholder="password">
<button type="submit">Login</button>
</form>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
if USERS.get(username) == password:
login_auth.login_user(request, username)
return login_auth.redirect_to_next(request)
else:
return redirect('/login')
@app.post('/logout')
def logout(request):
login_auth.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,27 +0,0 @@
from microdot import Microdot
from microdot_auth import TokenAuth
app = Microdot()
token_auth = TokenAuth()
TOKENS = {
'hello': 'susan',
'bye': 'david',
}
@token_auth.callback
def verify_token(request, token):
if token in TOKENS:
request.g.user = TOKENS[token]
return True
@app.route('/')
@token_auth
def index(request):
return f'Hello, {request.g.user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,14 +0,0 @@
curl -X GET http://localhost:5000/ <-- microdot
{"ram": 8429568}%
curl -X GET http://localhost:5000/ <-- microdot_asyncio
{"ram": 12410880}%
curl -X GET http://localhost:8000/ <-- microdot_wsgi
{"ram": 9101312}%
curl -X GET http://localhost:8000/ <-- microdot_asgi
{"ram": 18620416}%
curl -X GET http://localhost:5000/ <-- flask app.run
{"ram":25460736}
curl -X GET http://localhost:5000/ <-- flask run
{"ram":26210304}
curl -X GET http://localhost:5000/ <-- quart run
{"ram":31748096}%

View File

@@ -0,0 +1,19 @@
from microdot import Microdot, Response
from microdot_jinja import render_template
app = Microdot()
Response.default_content_type = 'text/html'
@app.route('/')
def index(req):
return render_template('page1.html', page='Page 1')
@app.route('/page2')
def page2(req):
return render_template('page2.html', page='Page 2')
if __name__ == '__main__':
app.run()

View File

@@ -10,7 +10,7 @@ def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return render_template('index_jinja.html', name=name)
return render_template('index.html', name=name)
if __name__ == '__main__':

View File

@@ -0,0 +1,48 @@
<!--
This is based on the Bootstrap 5 starter template from the documentation:
https://getbootstrap.com/docs/5.0/getting-started/introduction/#starter-template
-->
<!doctype html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<title>Microdot + Jinja + Bootstrap</title>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a class="navbar-brand" href="/">Microdot + Jinja + Bootstrap</a>
</div>
</nav>
<br>
<div class="container">
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;">
<symbol id="info-fill" fill="currentColor" viewBox="0 0 16 16">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</symbol>
</svg>
<div class="alert alert-primary d-flex align-items-center" role="alert">
<svg class="bi flex-shrink-0 me-2" width="24" height="24" role="img" aria-label="Info:"><use xlink:href="#info-fill"/></svg>
<div>This example demonstrates how to create an application that uses <a href="https://getbootstrap.com" class="alert-link">Bootstrap</a> styling. The page layout is defined in a base template that is inherited by several pages.</div>
</div>
{% block content %}{% endblock %}
</div>
<!-- Optional JavaScript; choose one of the two! -->
<!-- Option 1: Bootstrap Bundle with Popper -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM" crossorigin="anonymous"></script>
<!-- Option 2: Separate Popper and Bootstrap JS -->
<!--
<script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.9.2/dist/umd/popper.min.js" integrity="sha384-IQsoLXl5PILFhosVNubq5LC7Qb9DXgDA9i+tQ8Zj3iwWAwPtgFTxbJ8NT4GN1R8p" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.min.js" integrity="sha384-cVKIPhGWiC2Al4u+LWgxfKTRIcfu0JTxR+EQDz/bgldoEyl4H0zUF0QKbrJ0EcQF" crossorigin="anonymous"></script>
-->
</body>
</html>

View File

@@ -0,0 +1,6 @@
{% extends 'base.html' %}
{% block content %}
<h2>This is {{ page }}</h2>
<p>Go to <a href="/page2">Page 2</a>.</p>
{% endblock %}

View File

@@ -0,0 +1,6 @@
{% extends 'base.html' %}
{% block content %}
<h2>This is {{ page }}</h2>
<p>Go back <a href="/">Page 1</a>.</p>
{% endblock %}

View File

@@ -0,0 +1,19 @@
from microdot import Microdot, Response
from microdot_utemplate import render_template
app = Microdot()
Response.default_content_type = 'text/html'
@app.route('/')
def index(req):
return render_template('page1.html', page='Page 1')
@app.route('/page2')
def page2(req):
return render_template('page2.html', page='Page 2')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -10,7 +10,7 @@ def index(req):
name = None
if req.method == 'POST':
name = req.form.get('name')
return render_template('index_utemplate.html', name=name)
return render_template('index.html', name=name)
if __name__ == '__main__':

View File

@@ -0,0 +1,14 @@
</div>
<!-- Optional JavaScript; choose one of the two! -->
<!-- Option 1: Bootstrap Bundle with Popper -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js" integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM" crossorigin="anonymous"></script>
<!-- Option 2: Separate Popper and Bootstrap JS -->
<!--
<script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.9.2/dist/umd/popper.min.js" integrity="sha384-IQsoLXl5PILFhosVNubq5LC7Qb9DXgDA9i+tQ8Zj3iwWAwPtgFTxbJ8NT4GN1R8p" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.min.js" integrity="sha384-cVKIPhGWiC2Al4u+LWgxfKTRIcfu0JTxR+EQDz/bgldoEyl4H0zUF0QKbrJ0EcQF" crossorigin="anonymous"></script>
-->
</body>
</html>

View File

@@ -0,0 +1,33 @@
<!--
This is based on the Bootstrap 5 starter template from the documentation:
https://getbootstrap.com/docs/5.0/getting-started/introduction/#starter-template
-->
<!doctype html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<title>Microdot + uTemplate + Bootstrap</title>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container">
<a class="navbar-brand" href="/">Microdot + uTemplate + Bootstrap</a>
</div>
</nav>
<br>
<div class="container">
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;">
<symbol id="info-fill" fill="currentColor" viewBox="0 0 16 16">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</symbol>
</svg>
<div class="alert alert-primary d-flex align-items-center" role="alert">
<svg class="bi flex-shrink-0 me-2" width="24" height="24" role="img" aria-label="Info:"><use xlink:href="#info-fill"/></svg>
<div>This example demonstrates how to create an application that uses <a href="https://getbootstrap.com" class="alert-link">Bootstrap</a> styling. The page layout is defined in a base template that is inherited by several pages.</div>
</div>

View File

@@ -0,0 +1,7 @@
{% args page %}
{% include 'base_header.html' %}
<h2>This is {{ page }}</h2>
<p>Go to <a href="/page2">Page 2</a>.</p>
{% include 'base_footer.html' %}

View File

@@ -0,0 +1,7 @@
{% args page %}
{% include 'base_header.html' %}
<h2>This is {{ page }}</h2>
<p>Go back <a href="/">Page 1</a>.</p>
{% include 'base_footer.html' %}

View File

@@ -1,6 +1,7 @@
from microdot import Microdot, send_file
from microdot import Microdot, send_file, Request
app = Microdot()
Request.max_content_length = 1024 * 1024 # 1MB (change as needed)
@app.get('/')
@@ -30,4 +31,4 @@ def upload(request):
if __name__ == '__main__':
app.run()
app.run(debug=True)

View File

@@ -0,0 +1,34 @@
from microdot_asyncio import Microdot, send_file, Request
app = Microdot()
Request.max_content_length = 1024 * 1024 # 1MB (change as needed)
@app.get('/')
async def index(request):
return send_file('index.html')
@app.post('/upload')
async def upload(request):
# obtain the filename and size from request headers
filename = request.headers['Content-Disposition'].split(
'filename=')[1].strip('"')
size = int(request.headers['Content-Length'])
# sanitize the filename
filename = filename.replace('/', '_')
# write the file to the files directory in 1K chunks
with open('files/' + filename, 'wb') as f:
while size > 0:
chunk = await request.stream.read(min(size, 1024))
f.write(chunk)
size -= len(chunk)
print('Successfully saved file: ' + filename)
return ''
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,6 +1,6 @@
[metadata]
name = microdot
version = 1.1.2.dev0
version = 1.2.1
author = Miguel Grinberg
author_email = miguel.grinberg@gmail.com
description = The impossibly small web framework for MicroPython
@@ -28,8 +28,6 @@ py_modules =
microdot_utemplate
microdot_jinja
microdot_session
microdot_auth
microdot_login
microdot_websocket
microdot_websocket_alt
microdot_asyncio_websocket

View File

@@ -92,8 +92,9 @@ def urldecode_bytes(s):
def urlencode(s):
return s.replace(' ', '+').replace('%', '%25').replace('?', '%3F').replace(
'#', '%23').replace('&', '%26').replace('+', '%2B')
return s.replace('+', '%2B').replace(' ', '+').replace(
'%', '%25').replace('?', '%3F').replace('#', '%23').replace(
'&', '%26').replace('=', '%3D')
class NoCaseDict(dict):
@@ -144,6 +145,39 @@ class NoCaseDict(dict):
return super().get(self.keymap.get(kl, kl), default)
def mro(cls): # pragma: no cover
"""Return the method resolution order of a class.
This is a helper function that returns the method resolution order of a
class. It is used by Microdot to find the best error handler to invoke for
the raised exception.
In CPython, this function returns the ``__mro__`` attribute of the class.
In MicroPython, this function implements a recursive depth-first scanning
of the class hierarchy.
"""
if hasattr(cls, 'mro'):
return cls.__mro__
def _mro(cls):
m = [cls]
for base in cls.__bases__:
m += _mro(base)
return m
mro_list = _mro(cls)
# If a class appears multiple times (due to multiple inheritance) remove
# all but the last occurence. This matches the method resolution order
# of MicroPython, but not CPython.
mro_pruned = []
for i in range(len(mro_list)):
base = mro_list.pop(0)
if base not in mro_list:
mro_pruned.append(base)
return mro_pruned
class MultiDict(dict):
"""A subclass of dictionary that can hold multiple values for the same
key. It is used to hold key/value pairs decoded from query strings and
@@ -1103,10 +1137,18 @@ class Microdot():
res = exc.reason, exc.status_code
except Exception as exc:
print_exception(exc)
exc_class = None
res = None
if exc.__class__ in self.error_handlers:
exc_class = exc.__class__
else:
for c in mro(exc.__class__)[1:]:
if c in self.error_handlers:
exc_class = c
break
if exc_class:
try:
res = self.error_handlers[exc.__class__](req, exc)
res = self.error_handlers[exc_class](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:

View File

@@ -17,6 +17,7 @@ except ImportError:
import io
from microdot import Microdot as BaseMicrodot
from microdot import mro
from microdot import NoCaseDict
from microdot import Request as BaseRequest
from microdot import Response as BaseResponse
@@ -380,7 +381,8 @@ class Microdot(BaseMicrodot):
res = await self._invoke_handler(
handler, req, res) or res
for handler in req.after_request_handlers:
res = await handler(req, res) or res
res = await self._invoke_handler(
handler, req, res) or res
elif f in self.error_handlers:
res = await self._invoke_handler(
self.error_handlers[f], req)
@@ -393,11 +395,19 @@ class Microdot(BaseMicrodot):
res = exc.reason, exc.status_code
except Exception as exc:
print_exception(exc)
exc_class = None
res = None
if exc.__class__ in self.error_handlers:
exc_class = exc.__class__
else:
for c in mro(exc.__class__)[1:]:
if c in self.error_handlers:
exc_class = c
break
if exc_class:
try:
res = await self._invoke_handler(
self.error_handlers[exc.__class__], req, exc)
self.error_handlers[exc_class], req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:

View File

@@ -1,65 +0,0 @@
from microdot import abort
class BaseAuth:
def __init__(self, header='Authorization', scheme=None):
self.auth_callback = None
self.error_callback = self.auth_failed
self.header = header
self.scheme = scheme.lower()
def callback(self, f):
"""Decorator to configure the authentication callback.
Microdot calls the authentication callback to allow the application to
check user credentials.
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def auth_failed(self):
abort(401)
def __call__(self, func):
def wrapper(request, *args, **kwargs):
auth = request.headers.get(self.header)
if not auth:
return self.error_callback()
if self.header == 'Authorization':
if ' ' not in auth:
return self.error_callback()
scheme, auth = auth.split(' ', 1)
if scheme.lower() != self.scheme:
return self.error_callback()
if not self.auth_callback(request, *self._get_auth_args(auth)):
return self.error_callback()
return func(request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
def __init__(self):
super().__init__(scheme='Basic')
def _get_auth_args(self, auth):
import binascii
username, password = binascii.a2b_base64(auth).decode('utf-8').split(
':', 1)
return (username, password)
class TokenAuth(BaseAuth):
def __init__(self, header='Authorization', scheme='Bearer'):
super().__init__(header=header, scheme=scheme)
def _get_auth_args(self, token):
return (token,)

View File

@@ -1,46 +0,0 @@
from microdot import redirect, urlencode
from microdot_session import get_session, update_session
class LoginAuth:
def __init__(self, login_url='/login'):
super().__init__()
self.login_url = login_url
self.user_callback = self._accept_user
def callback(self, f):
self.user_callback = f
def login_user(self, request, user_id):
session = get_session(request)
session['user_id'] = user_id
update_session(request, session)
return session
def logout_user(self, request):
session = get_session(request)
session.pop('user_id', None)
update_session(request, session)
return session
def redirect_to_next(self, request, default_url='/'):
next_url = request.args.get('next', default_url)
if not next_url.startswith('/'):
next_url = default_url
return redirect(next_url)
def __call__(self, func):
def wrapper(request, *args, **kwargs):
session = get_session(request)
if 'user_id' not in session:
return redirect(self.login_url + '?next=' + urlencode(
request.url))
if not self.user_callback(request, session['user_id']):
return redirect(self.login_url + '?next=' + urlencode(
request.url))
return func(request, *args, **kwargs)
return wrapper
def _accept_user(self, request, user_id):
return True

View File

@@ -465,6 +465,90 @@ class TestMicrodot(unittest.TestCase):
'text/plain; charset=UTF-8')
self.assertEqual(res.text, '501')
def test_exception_handler_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return exc.__class__.__name__, 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_redundant_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return 'LookupError', 501
@app.errorhandler(IndexError)
def handle_index_error(req, exc):
return 'IndexError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_multiple_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(Exception)
def handle_generic_exception(req, exc):
return 'Exception', 501
@app.errorhandler(LookupError)
def handle_lookup_error(req, exc):
return 'LookupError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'LookupError')
def test_exception_handler_no_viable_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(RuntimeError)
def handle_runtime_error(req, exc):
return 'RuntimeError', 501
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 500)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'Internal server error')
def test_abort(self):
app = Microdot()

View File

@@ -500,6 +500,90 @@ class TestMicrodotAsync(unittest.TestCase):
'text/plain; charset=UTF-8')
self.assertEqual(res.text, '501')
def test_exception_handler_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return exc.__class__.__name__, 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_redundant_parent(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return 'LookupError', 501
@app.errorhandler(IndexError)
async def handle_index_error(req, exc):
return 'IndexError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'IndexError')
def test_exception_handler_multiple_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(Exception)
async def handle_generic_exception(req, exc):
return 'Exception', 501
@app.errorhandler(LookupError)
async def handle_lookup_error(req, exc):
return 'LookupError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 501)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'LookupError')
def test_exception_handler_no_viable_parents(self):
app = Microdot()
@app.route('/')
def index(req):
foo = []
return foo[1]
@app.errorhandler(RuntimeError)
async def handle_runtime_error(req, exc):
return 'RuntimeError', 501
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 500)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'Internal server error')
def test_abort(self):
app = Microdot()

View File

@@ -1,113 +0,0 @@
import binascii
import unittest
from microdot import Microdot
from microdot_auth import BasicAuth, TokenAuth
from microdot_test_client import TestClient
class TestAuth(unittest.TestCase):
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.callback
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
request.g.user = {'username': username}
return True
@app.route('/')
@basic_auth
def index(request):
return request.g.user['username']
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()})
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.callback
def authenticate(request, token):
if token == 'foo':
request.g.user = 'user'
return True
@app.route('/')
@token_auth
def index(request):
return request.g.user
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Basic foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Bearer foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.callback
def authenticate(request, token):
if token == 'foo':
request.g.user = 'user'
return True
@app.route('/')
@token_auth
def index(request):
return request.g.user
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Basic foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'Authorization': 'Bearer foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'X-Token-Auth': 'Bearer foo'})
self.assertEqual(res.status_code, 401)
res = client.get('/', headers={'X-Auth-Token': 'foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = client.get('/', headers={'x-auth-token': 'foo'})
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler():
return {'status_code': 403}, 403
res = client.get('/')
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

View File

@@ -1,134 +0,0 @@
import unittest
from microdot import Microdot
from microdot_login import LoginAuth
from microdot_session import set_session_secret_key, with_session
from microdot_test_client import TestClient
set_session_secret_key('top-secret!')
class TestLogin(unittest.TestCase):
def test_login_auth(self):
app = Microdot()
login_auth = LoginAuth()
@app.get('/')
@login_auth
def index(request):
return 'ok'
@app.post('/login')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
@app.post('/logout')
def logout(request):
login_auth.logout_user(request)
return 'ok'
client = TestClient(app)
res = client.get('/?foo=bar')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = client.post('/login?next=/%3Ffoo=bar')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'ok')
res = client.post('/logout')
self.assertEqual(res.status_code, 200)
res = client.get('/')
self.assertEqual(res.status_code, 302)
def test_login_auth_with_session(self):
app = Microdot()
login_auth = LoginAuth(login_url='/foo')
@app.get('/')
@login_auth
@with_session
def index(request, session):
return session['user_id']
@app.post('/foo')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.get('/')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/foo?next=/')
res = client.post('/foo')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_login_auth_user_callback(self):
app = Microdot()
login_auth = LoginAuth()
@login_auth.callback
def check_user(request, user_id):
request.g.user_id = user_id
return user_id == 'user'
@app.get('/')
@login_auth
def index(request):
return request.g.user_id
@app.post('/good-login')
def good_login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
@app.post('/bad-login')
def bad_login(request):
login_auth.login_user(request, 'foo')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.post('/good-login')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = client.post('/bad-login')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = client.get('/')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/')
def test_login_auth_bad_redirect(self):
app = Microdot()
login_auth = LoginAuth()
@app.get('/')
@login_auth
def index(request):
return 'ok'
@app.post('/login')
def login(request):
login_auth.login_user(request, 'user')
return login_auth.redirect_to_next(request)
client = TestClient(app)
res = client.post('/login?next=http://example.com')
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')

View File

@@ -1,22 +1,24 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
import unittest
from microdot import Microdot
from microdot_asyncio import Microdot as MicrodotAsync
from microdot_session import set_session_secret_key, get_session, \
update_session, delete_session, with_session
from microdot_test_client import TestClient
from microdot_asyncio_test_client import TestClient as TestClientAsync
set_session_secret_key('top-secret!')
class TestSession(unittest.TestCase):
def setUp(self):
self.app = Microdot()
self.client = TestClient(self.app)
def tearDown(self):
pass
def test_session(self):
@self.app.get('/')
app = Microdot()
client = TestClient(app)
@app.get('/')
def index(req):
session = get_session(req)
session2 = get_session(req)
@@ -24,52 +26,106 @@ class TestSession(unittest.TestCase):
self.assertEqual(session['foo'], 'bar')
return str(session.get('name'))
@self.app.get('/with')
@app.get('/with')
@with_session
def session_context_manager(req, session):
return str(session.get('name'))
@self.app.post('/set')
@app.post('/set')
def set_session(req):
update_session(req, {'name': 'joe'})
return 'OK'
@self.app.post('/del')
@app.post('/del')
def del_session(req):
delete_session(req)
return 'OK'
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'None')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'None')
res = self.client.post('/set')
res = client.post('/set')
self.assertEqual(res.text, 'OK')
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'joe')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'joe')
res = self.client.post('/del')
res = client.post('/del')
self.assertEqual(res.text, 'OK')
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.text, 'None')
res = self.client.get('/with')
res = client.get('/with')
self.assertEqual(res.text, 'None')
def _run(self, coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
def test_session_async(self):
app = MicrodotAsync()
client = TestClientAsync(app)
@app.get('/')
async def index(req):
session = get_session(req)
session2 = get_session(req)
session2['foo'] = 'bar'
self.assertEqual(session['foo'], 'bar')
return str(session.get('name'))
@app.get('/with')
@with_session
async def session_context_manager(req, session):
return str(session.get('name'))
@app.post('/set')
async def set_session(req):
update_session(req, {'name': 'joe'})
return 'OK'
@app.post('/del')
async def del_session(req):
delete_session(req)
return 'OK'
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'None')
res = self._run(client.post('/set'))
self.assertEqual(res.text, 'OK')
res = self._run(client.get('/'))
self.assertEqual(res.text, 'joe')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'joe')
res = self._run(client.post('/del'))
self.assertEqual(res.text, 'OK')
res = self._run(client.get('/'))
self.assertEqual(res.text, 'None')
res = self._run(client.get('/with'))
self.assertEqual(res.text, 'None')
def test_session_no_secret_key(self):
set_session_secret_key(None)
app = Microdot()
client = TestClient(app)
@self.app.get('/')
@app.get('/')
def index(req):
self.assertRaises(ValueError, get_session, req)
self.assertRaises(ValueError, update_session, req, {})
return ''
res = self.client.get('/')
res = client.get('/')
self.assertEqual(res.status_code, 200)
set_session_secret_key('top-secret!')

11
tests/test_urlencode.py Normal file
View File

@@ -0,0 +1,11 @@
import unittest
from microdot import urlencode, urldecode_str, urldecode_bytes
class TestURLEncode(unittest.TestCase):
def test_urlencode(self):
self.assertEqual(urlencode('?foo=bar&x'), '%3Ffoo%3Dbar%26x')
def test_urldecode(self):
self.assertEqual(urldecode_str('%3Ffoo%3Dbar%26x'), '?foo=bar&x')
self.assertEqual(urldecode_bytes(b'%3Ffoo%3Dbar%26x'), '?foo=bar&x')