Compare commits

..

6 Commits

Author SHA1 Message Date
Miguel Grinberg
2efbd67878 Release 2.1.0 2025-02-04 00:31:06 +00:00
Miguel Grinberg
d807011ad0 user logins 2025-02-04 00:04:55 +00:00
Miguel Grinberg
675c978797 Basic and token authentication support 2025-02-03 20:00:36 +00:00
Miguel Grinberg
cd87abba30 Mount unit tests 2025-02-03 11:06:26 +00:00
Miguel Grinberg
fd7931e1ae Added Request.url_prefix, Reques.subapp and local mounts 2025-02-03 00:33:59 +00:00
Maxi
d487a73c1e add js to sse example (#281) 2025-01-22 23:42:51 +00:00
23 changed files with 1500 additions and 78 deletions

View File

@@ -1,5 +1,14 @@
# Microdot change log
**Release 2.1.0** - 2025-02-04
- User login support ([commit](https://github.com/miguelgrinberg/microdot/commit/d807011ad006e53e70c4594d7eac04d03bb08681))
- Basic and token authentication support ([commit](https://github.com/miguelgrinberg/microdot/commit/675c9787974da926af446974cd96ef224e0ee27f))
- Added `local` argument to the `app.mount()` method, to define sub-application specific before and after request handlers ([commit](https://github.com/miguelgrinberg/microdot/commit/fd7931e1aec173c60f81dad18c1a102ed8f0e081))
- Added `Request.url_prefix`, `Request.subapp` and local mounts ([commit](https://github.com/miguelgrinberg/microdot/commit/fd7931e1aec173c60f81dad18c1a102ed8f0e081))
- Added a front end to the SSE example [#281](https://github.com/miguelgrinberg/microdot/issues/281) ([commit](https://github.com/miguelgrinberg/microdot/commit/d487a73c1ea5b3467e23907618b348ca52e0235c)) (thanks **Maxi**!)
- Additional ``app.mount()`` unit tests ([commit](https://github.com/miguelgrinberg/microdot/commit/cd87abba30206ec6d3928e0aabacb2fccf7baf70))
**Release 2.0.7** - 2024-11-10
- Accept responses with just a status code [#263](https://github.com/miguelgrinberg/microdot/issues/263) ([commit #1](https://github.com/miguelgrinberg/microdot/commit/4eac013087f807cafa244b8a6b7b0ed4c82ff150) [commit #2](https://github.com/miguelgrinberg/microdot/commit/c46e4291061046f1be13f300dd08645b71c16635))

View File

@@ -44,6 +44,22 @@ User Sessions
.. automodule:: microdot.session
:members:
Authentication
--------------
.. automodule:: microdot.auth
:inherited-members:
:special-members: __call__
:members:
User Logins
-----------
.. automodule:: microdot.login
:inherited-members:
:special-members: __call__
:members:
Cross-Origin Resource Sharing (CORS)
------------------------------------

View File

@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
described in this section are maintained as part of the Microdot project in
the same source code repository.
WebSocket Support
~~~~~~~~~~~~~~~~~
WebSocket
~~~~~~~~-
.. list-table::
:align: left
@@ -39,8 +39,8 @@ Example::
message = await ws.receive()
await ws.send(message)
Server-Sent Events Support
~~~~~~~~~~~~~~~~~~~~~~~~~~
Server-Sent Events
~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -78,8 +78,8 @@ Example::
the SSE object. For bidirectional communication with the client, use the
WebSocket extension.
Rendering Templates
~~~~~~~~~~~~~~~~~~~
Templates
~~~~~~~~~
Many web applications use HTML templates for rendering content to clients.
Microdot includes extensions to render templates with the
@@ -202,8 +202,8 @@ must be used.
.. note::
The Jinja extension is not compatible with MicroPython.
Maintaining Secure User Sessions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Secure User Sessions
~~~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -270,6 +270,178 @@ The :func:`save() <microdot.session.SessionDict.save>` and
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
and destroy the user session respectively.
Authentication
~~~~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
* - Required external dependencies
- | None
* - Examples
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
The authentication extension provides helper classes for two commonly used
authentication patterns, described below.
Basic Authentication
^^^^^^^^^^^^^^^^^^^^
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
is a method of authentication that is part of the HTTP specification. It allows
clients to authenticate to a server using a username and a password. Web
browsers have native support for Basic Authentication and will automatically
prompt the user for a username and a password when a protected resource is
accessed.
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
class::
from microdot.auth import BasicAuth
auth = BasicAuth(app)
Next, create an authentication function. The function must accept a request
object and a username and password pair provided by the user. If the
credentials are valid, the function must return an object that represents the
user. If the authentication function cannot validate the user provided
credentials it must return ``None``. Decorate the function with
``@auth.authenticate``::
@auth.authenticate
async def verify_user(request, username, password):
user = await load_user_from_database(username)
if user and user.verify_password(password):
return user
To protect a route with authentication, add the ``auth`` instance as a
decorator::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
While running an authenticated request, the user object returned by the
authenticaction function is accessible as ``request.g.current_user``.
Token Authentication
^^^^^^^^^^^^^^^^^^^^
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
from microdot.auth import TokenAuth
auth = TokenAuth()
Then add a function that verifies the token and returns the user it belongs to,
or ``None`` if the token is invalid or expired::
@auth.authenticate
async def verify_token(request, token):
return load_user_from_token(token)
As with Basic authentication, the ``auth`` instance is used as a decorator to
protect your routes::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
User Logins
~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
* - Required external dependencies
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
* - Examples
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
The login extension provides user login functionality. The logged in state of
the user is stored in the user session cookie, and an optional "remember me"
cookie can also be added to keep the user logged in across browser sessions.
To use this extension, create instances of the
:class:`Session <microdot.session.Session>` and :class:`Login <microdot.login.Login>`
class::
Session(app, secret_key='top-secret!')
login = Login()
The ``Login`` class accept an optional argument with the URL of the login page.
The default for this URL is */login*.
The application must represent users as objects with an ``id`` attribute. A
function decorated with ``@login.user_loader`` is used to load a user object::
@login.user_loader
async def get_user(user_id):
return database.get_user(user_id)
The application must implement the login form. At the point in which the user
credentials have been received and verified, a call to the
:func:`login_user() <microdot.login.Login.login_user>` function must be made to
record the user in the user session::
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
# ...
if user.check_password(password):
return await login.login_user(request, user, remember=remember_me)
return redirect('/login')
The optional ``remember`` argument is used to add a remember me cookie that
will log the user in automatically in future sessions. A value of ``True`` will
keep the log in active for 30 days. Alternatively, an integer number of days
can be passed in this argument.
Any routes that require the user to be logged in must be decorated with
:func:`@login <microdot.login.Login.__call__>`::
@app.route('/')
@login
async def index(request):
# ...
Routes that are of a sensitive nature can be decorated with
:func:`@login.fresh <microdot.login.Login.fresh>`
instead. This decorator requires that the user has logged in during the current
session, and will ask the user to logged in again if the session was
authenticated through a remember me cookie::
@app.get('/fresh')
@login.fresh
async def fresh(request):
# ...
To log out a user, the :func:`logout_user() <microdot.auth.Login.logout_user>`
is used::
@app.post('/logout')
@login
async def logout(request):
await login.logout_user(request)
return redirect('/')
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -305,8 +477,8 @@ Example::
cors = CORS(app, allowed_origins=['https://example.com'],
allow_credentials=True)
Testing with the Test Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Client
~~~~~~~~~~~
.. list-table::
:align: left
@@ -342,8 +514,8 @@ Example::
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
class for more details.
Deploying on a Production Web Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Production Deployments
~~~~~~~~~~~~~~~~~~~~~~
The ``Microdot`` class creates its own simple web server. This is enough for an
application deployed with MicroPython, but when using CPython it may be useful

View File

@@ -445,7 +445,7 @@ Mounting a Sub-Application
^^^^^^^^^^^^^^^^^^^^^^^^^^
Small Microdot applications can be written as a single source file, but this
is not the best option for applications that past a certain size. To make it
is not the best option for applications that pass a certain size. To make it
simpler to write large applications, Microdot supports the concept of
sub-applications that can be "mounted" on a larger application, possibly with
a common URL prefix applied to all of its routes. For developers familiar with
@@ -501,11 +501,25 @@ The resulting application will have the customer endpoints available at
*/customers/* and the order endpoints available at */orders/*.
.. note::
Before-request, after-request and error handlers defined in the
sub-application are also copied over to the main application at mount time.
Once installed in the main application, these handlers will apply to the
whole application and not just the sub-application in which they were
created.
During the handling of a request, the
:attr:`Request.url_prefix <microdot.Microdot.url_prefix>` attribute is
set to the URL prefix under which the sub-application was mounted, or an
empty string if the endpoint did not come from a sub-application or the
sub-application was mounted without a URL prefix. It is possible to issue a
redirect that is relative to the sub-application as follows::
return redirect(request.url_prefix + '/relative-url')
When mounting an application as shown above, before-request, after-request and
error handlers defined in the sub-application are copied over to the main
application at mount time. Once installed in the main application, these
handlers will apply to the whole application and not just the sub-application
in which they were created.
The :func:`mount() <microdot.Microdot.mount>` method has a ``local`` argument
that defaults to ``False``. When this argument is set to ``True``, the
before-request, after-request and error handlers defined in the sub-application
will only apply to the sub-application.
Shutting Down the Server
^^^^^^^^^^^^^^^^^^^^^^^^

1
examples/auth/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate basic and token authentication.

View File

@@ -0,0 +1,31 @@
from microdot import Microdot
from microdot.auth import BasicAuth
from pbkdf2 import generate_password_hash, check_password_hash
# this example provides an implementation of the generate_password_hash and
# check_password_hash functions that can be used in MicroPython. On CPython
# there are many other options for password hashisng so there is no need to use
# this custom solution.
USERS = {
'susan': generate_password_hash('hello'),
'david': generate_password_hash('bye'),
}
app = Microdot()
auth = BasicAuth()
@auth.authenticate
async def check_credentials(request, username, password):
if username in USERS and check_password_hash(USERS[username], password):
return username
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

47
examples/auth/pbkdf2.py Normal file
View File

@@ -0,0 +1,47 @@
import os
import hashlib
# PBKDF2 secure password hashing algorithm obtained from:
# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with-
# python,-example-code/
def sha256(b):
return hashlib.sha256(b).digest()
def ljust(b, n, f):
return b + f * (n - len(b))
def gethmac(key, content):
okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0'))
ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0'))
return sha256(okeypad + sha256(ikeypad + content))
def pbkdf2(pwd, salt, iterations=1000):
U = salt + b'\x00\x00\x00\x01'
T = bytes(64)
for _ in range(iterations):
U = gethmac(pwd, U)
T = bytes(a ^ b for a, b in zip(U, T))
return T
# The number of iterations may need to be adjusted depending on the hardware.
# Lower numbers make the password hashing algorithm faster but less secure, so
# the largest number that can be tolerated should be used.
def generate_password_hash(password, salt=None, iterations=100000):
salt = salt or os.urandom(16)
dk = pbkdf2(password.encode(), salt, iterations)
return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}'
def check_password_hash(password_hash, password):
algorithm, salt, iterations, dk = password_hash.split(':')
iterations = int(iterations)
if algorithm != 'pbkdf2-hmac-sha256':
return False
return pbkdf2(password.encode(), salt=bytes.fromhex(salt),
iterations=iterations) == bytes.fromhex(dk)

View File

@@ -0,0 +1,26 @@
from microdot import Microdot
from microdot.auth import TokenAuth
app = Microdot()
auth = TokenAuth()
TOKENS = {
'susan-token': 'susan',
'david-token': 'david',
}
@auth.authenticate
async def check_token(request, token):
if token in TOKENS:
return TOKENS[token]
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

1
examples/login/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate user logins.

123
examples/login/login.py Normal file
View File

@@ -0,0 +1,123 @@
from microdot import Microdot, redirect
from microdot.session import Session
from microdot.login import Login
from pbkdf2 import generate_password_hash, check_password_hash
# this example provides an implementation of the generate_password_hash and
# check_password_hash functions that can be used in MicroPython. On CPython
# there are many other options for password hashisng so there is no need to use
# this custom solution.
class User:
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password_hash = self.create_hash(password)
def create_hash(self, password):
return generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
USERS = {
'user001': User('user001', 'susan', 'hello'),
'user002': User('user002', 'david', 'bye'),
}
app = Microdot()
Session(app, secret_key='top-secret!')
login = Login()
@login.user_loader
async def get_user(user_id):
return USERS.get(user_id)
@app.route('/login', methods=['GET', 'POST'])
async def login_page(request):
if request.method == 'GET':
return '''
<!doctype html>
<html>
<body>
<h1>Please Login</h1>
<form method="POST">
<p>
Username<br>
<input name="username" autofocus>
</p>
<p>
Password:<br>
<input name="password" type="password">
<br>
</p>
<p>
<input name="remember_me" type="checkbox"> Remember me
<br>
</p>
<p>
<button type="submit">Login</button>
</p>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))
for user in USERS.values():
if user.username == username:
if user.check_password(password):
return await login.login_user(request, user,
remember=remember_me)
return redirect('/login')
@app.route('/')
@login
async def index(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>
<a href="/fresh">Click here</a> to access the fresh login page.
</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>This page requires a fresh login session.</p>
<p><a href="/">Go back</a> to the main page.</p>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.post('/logout')
@login
async def logout(request):
await login.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

47
examples/login/pbkdf2.py Normal file
View File

@@ -0,0 +1,47 @@
import os
import hashlib
# PBKDF2 secure password hashing algorithm obtained from:
# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with-
# python,-example-code/
def sha256(b):
return hashlib.sha256(b).digest()
def ljust(b, n, f):
return b + f * (n - len(b))
def gethmac(key, content):
okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0'))
ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0'))
return sha256(okeypad + sha256(ikeypad + content))
def pbkdf2(pwd, salt, iterations=1000):
U = salt + b'\x00\x00\x00\x01'
T = bytes(64)
for _ in range(iterations):
U = gethmac(pwd, U)
T = bytes(a ^ b for a, b in zip(U, T))
return T
# The number of iterations may need to be adjusted depending on the hardware.
# Lower numbers make the password hashing algorithm faster but less secure, so
# the largest number that can be tolerated should be used.
def generate_password_hash(password, salt=None, iterations=100000):
salt = salt or os.urandom(16)
dk = pbkdf2(password.encode(), salt, iterations)
return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}'
def check_password_hash(password_hash, password):
algorithm, salt, iterations, dk = password_hash.split(':')
iterations = int(iterations)
if algorithm != 'pbkdf2-hmac-sha256':
return False
return pbkdf2(password.encode(), salt=bytes.fromhex(salt),
iterations=iterations) == bytes.fromhex(dk)

View File

@@ -1,3 +1,6 @@
# This is a simple example that demonstrates how to use the user session, but
# is not intended as a complete login solution. See the login subdirectory for
# a more complete example.
from microdot import Microdot, Response, redirect
from microdot.session import Session, with_session

View File

@@ -1,16 +1,28 @@
import asyncio
from microdot import Microdot
from microdot import Microdot, send_file
from microdot.sse import with_sse
app = Microdot()
@app.route("/")
async def main(request):
return send_file('index.html')
@app.route('/events')
@with_sse
async def events(request, sse):
for i in range(10):
await asyncio.sleep(1)
await sse.send({'counter': i})
print('Client connected')
try:
i = 0
while True:
await asyncio.sleep(1)
i += 1
await sse.send({'counter': i})
except asyncio.CancelledError:
pass
print('Client disconnected')
app.run(debug=True)
app.run()

30
examples/sse/index.html Normal file
View File

@@ -0,0 +1,30 @@
<!DOCTYPE html>
<html>
<head>
<title>Microdot SSE Example</title>
<meta charset="UTF-8">
</head>
<body>
<h1>Microdot SSE Example</h1>
<div id="log"></div>
<script>
const log = (text, color) => {
document.getElementById('log').innerHTML += `<span style="color: ${color}">${text}</span><br>`;
};
const eventSource = new EventSource('/events');
eventSource.onopen = () => {
log('Connection to server opened.', 'black');
};
eventSource.onmessage = (event) => {
log(`Received message: ${event.data}`, 'blue');
};
eventSource.onerror = (event) => {
log(`EventSource failed: ${event.type}`, 'red');
};
</script>
</body>
</html>

View File

@@ -1,6 +1,6 @@
[project]
name = "microdot"
version = "2.0.8.dev0"
version = "2.1.0"
authors = [
{ name = "Miguel Grinberg", email = "miguel.grinberg@gmail.com" },
]

144
src/microdot/auth.py Normal file
View File

@@ -0,0 +1,144 @@
from microdot import abort
from microdot.microdot import invoke_handler
class BaseAuth:
def __init__(self):
self.auth_callback = None
self.error_callback = None
def __call__(self, f):
"""Decorator to protect a route with authentication.
An instance of this class must be used as a decorator on the routes
that need to be protected. Example::
auth = BasicAuth() # or TokenAuth()
@app.route('/protected')
@auth
def protected(request):
# ...
Routes that are decorated in this way will only be invoked if the
authentication callback returned a valid user object, otherwise the
error callback will be executed.
"""
async def wrapper(request, *args, **kwargs):
auth = self._get_auth(request)
if not auth:
return await invoke_handler(self.error_callback, request)
request.g.current_user = await invoke_handler(
self.auth_callback, request, *auth)
if not request.g.current_user:
return await invoke_handler(self.error_callback, request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
"""Basic Authentication.
:param realm: The realm that is displayed when the user is prompted to
authenticate in the browser.
:param charset: The charset that is used to encode the realm.
:param scheme: The authentication scheme. Defaults to 'Basic'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
error_status=401):
super().__init__()
self.realm = realm
self.charset = charset
self.scheme = scheme
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get('Authorization')
if auth and auth.startswith('Basic '):
import binascii
try:
username, password = binascii.a2b_base64(
auth[6:]).decode().split(':', 1)
except Exception: # pragma: no cover
return None
return username, password
def authentication_error(self, request):
return '', self.error_status, {
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
self.scheme, self.realm, self.charset)}
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, username, password):
user = get_user(username)
if user and user.check_password(password):
return get_user(username)
"""
self.auth_callback = f
class TokenAuth(BaseAuth):
"""Token based authentication.
:param header: The name of the header that will contain the token. Defaults
to 'Authorization'.
:param scheme: The authentication scheme. Defaults to 'Bearer'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, header='Authorization', scheme='Bearer',
error_status=401):
super().__init__()
self.header = header
self.scheme = scheme.lower()
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get(self.header)
if auth:
if self.header == 'Authorization':
try:
scheme, token = auth.split(' ', 1)
except Exception:
return None
if scheme.lower() == self.scheme:
return (token.strip(),)
else:
return (auth,)
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, token):
return get_user(token)
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def authentication_error(self, request):
abort(self.error_status)

163
src/microdot/login.py Normal file
View File

@@ -0,0 +1,163 @@
from time import time
from microdot import redirect
from microdot.microdot import urlencode, invoke_handler
class Login:
"""User login support for Microdot.
:param login_url: the URL to redirect to when a login is required. The
default is '/login'.
"""
def __init__(self, login_url='/login'):
self.login_url = login_url
self.user_loader_callback = None
def user_loader(self, f):
"""Decorator to configure the user callback.
The decorated function receives the user ID as an argument and must
return the corresponding user object, or ``None`` if the user ID is
invalid.
"""
self.user_loader_callback = f
def _get_session(self, request):
return request.app._session.get(request)
def _update_remember_cookie(self, request, days, user_id=None):
remember_payload = request.app._session.encode({
'user_id': user_id,
'days': days,
'exp': time() + days * 24 * 60 * 60
})
@request.after_request
async def _set_remember_cookie(request, response):
response.set_cookie('_remember', remember_payload,
max_age=days * 24 * 60 * 60)
return response
def _get_user_id_from_session(self, request):
session = self._get_session(request)
if session and '_user_id' in session:
return session['_user_id']
if '_remember' in request.cookies:
remember_payload = request.app._session.decode(
request.cookies['_remember'])
user_id = remember_payload.get('user_id')
if user_id: # pragma: no branch
self._update_remember_cookie(
request, remember_payload.get('_days', 30), user_id)
session['_user_id'] = user_id
session['_fresh'] = False
session.save()
return user_id
async def _redirect_to_login(self, request):
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
request.url)}
async def login_user(self, request, user, remember=False,
redirect_url='/'):
"""Log a user in.
:param request: the request object
:param user: the user object
:param remember: if the user's logged in state should be remembered
with a cookie after the session ends. Set to the
number of days the remember cookie should last, or to
``True`` to use a default duration of 30 days.
:param redirect_url: the URL to redirect to after login
This call marks the user as logged in by storing their user ID in the
user session. The application must call this method to log a user in
after their credentials have been validated.
The method returns a redirect response, either to the URL the user
originally intended to visit, or if there is no original URL to the URL
specified by the `redirect_url`.
"""
session = self._get_session(request)
session['_user_id'] = user.id
session['_fresh'] = True
session.save()
if remember:
days = 30 if remember is True else int(remember)
self._update_remember_cookie(request, days, session['_user_id'])
next_url = request.args.get('next', redirect_url)
if not next_url.startswith('/'):
next_url = redirect_url
return redirect(next_url)
async def logout_user(self, request):
"""Log a user out.
:param request: the request object
This call removes information about the user's log in from the user
session. If a remember cookie exists, it is removed as well.
"""
session = self._get_session(request)
session.pop('_user_id', None)
session.pop('_fresh', None)
session.save()
if '_remember' in request.cookies:
self._update_remember_cookie(request, 0)
def __call__(self, f):
"""Decorator to protect a route with authentication.
If the user is not logged in, Microdot will redirect to the login page
first. The decorated route will only run after successful login by the
user. If the user is already logged in, the route will run immediately.
Example::
login = Login()
@app.route('/secret')
@login
async def secret(request):
# only accessible to authenticated users
"""
async def wrapper(request, *args, **kwargs):
user_id = self._get_user_id_from_session(request)
if not user_id:
return await self._redirect_to_login(request)
request.g.current_user = await invoke_handler(
self.user_loader_callback, user_id)
if not request.g.current_user:
return await self._redirect_to_login(request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
def fresh(self, f):
"""Decorator to protect a route with "fresh" authentication.
This decorator prevents the route from running when the login session
is not fresh. A fresh session is a session that has been created from
direct user interaction with the login page, while a non-fresh session
occurs when a login is restored from a "remember me" cookie. Example::
login = Login()
@app.route('/secret')
@auth.fresh
async def secret(request):
# only accessible to authenticated users
# users logged in via remember me cookie will need to
# re-authenticate
"""
base_wrapper = self.__call__(f)
async def wrapper(request, *args, **kwargs):
session = self._get_session(request)
if session.get('_fresh'):
return await base_wrapper(request, *args, **kwargs)
return await self._redirect_to_login(request)
return wrapper

View File

@@ -329,7 +329,8 @@ class Request:
pass
def __init__(self, app, client_addr, method, url, http_version, headers,
body=None, stream=None, sock=None):
body=None, stream=None, sock=None, url_prefix='',
subapp=None):
#: The application instance to which this request belongs.
self.app = app
#: The address of the client, as a tuple (host, port).
@@ -338,6 +339,12 @@ class Request:
self.method = method
#: The request URL, including the path and query string.
self.url = url
#: The URL prefix, if the endpoint comes from a mounted
#: sub-application, or else ''.
self.url_prefix = url_prefix
#: The sub-application instance, or `None` if this isn't a mounted
#: endpoint.
self.subapp = subapp
#: The path portion of the URL.
self.path = url
#: The query string portion of the URL.
@@ -891,6 +898,9 @@ class URLPattern():
except ValueError:
return None, None
def __repr__(self): # pragma: no cover
return 'URLPattern: {}'.format(self.url_pattern)
class HTTPException(Exception):
def __init__(self, status_code, reason=None):
@@ -959,7 +969,7 @@ class Microdot:
def decorated(f):
self.url_map.append(
([m.upper() for m in (methods or ['GET'])],
URLPattern(url_pattern), f))
URLPattern(url_pattern), f, '', None))
return f
return decorated
@@ -1127,24 +1137,33 @@ class Microdot:
return f
return decorated
def mount(self, subapp, url_prefix=''):
def mount(self, subapp, url_prefix='', local=False):
"""Mount a sub-application, optionally under the given URL prefix.
:param subapp: The sub-application to mount.
:param url_prefix: The URL prefix to mount the application under.
:param local: When set to ``True``, the before, after and error request
handlers only apply to endpoints defined in the
sub-application. When ``False``, they apply to the entire
application. The default is ``False``.
"""
for methods, pattern, handler in subapp.url_map:
for methods, pattern, handler, _prefix, _subapp in subapp.url_map:
self.url_map.append(
(methods, URLPattern(url_prefix + pattern.url_pattern),
handler))
for handler in subapp.before_request_handlers:
self.before_request_handlers.append(handler)
for handler in subapp.after_request_handlers:
self.after_request_handlers.append(handler)
for handler in subapp.after_error_request_handlers:
self.after_error_request_handlers.append(handler)
for status_code, handler in subapp.error_handlers.items():
self.error_handlers[status_code] = handler
handler, url_prefix + _prefix, _subapp or subapp))
if not local:
for handler in subapp.before_request_handlers:
self.before_request_handlers.append(handler)
subapp.before_request_handlers = []
for handler in subapp.after_request_handlers:
self.after_request_handlers.append(handler)
subapp.after_request_handlers = []
for handler in subapp.after_error_request_handlers:
self.after_error_request_handlers.append(handler)
subapp.after_error_request_handlers = []
for status_code, handler in subapp.error_handlers.items():
self.error_handlers[status_code] = handler
subapp.error_handlers = {}
@staticmethod
def abort(status_code, reason=None):
@@ -1302,23 +1321,28 @@ class Microdot:
def find_route(self, req):
method = req.method.upper()
if method == 'OPTIONS' and self.options_handler:
return self.options_handler(req)
return self.options_handler(req), '', None
if method == 'HEAD':
method = 'GET'
f = 404
for route_methods, route_pattern, route_handler in self.url_map:
p = ''
s = None
for route_methods, route_pattern, route_handler, url_prefix, subapp \
in self.url_map:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
p = url_prefix
s = subapp
if method in route_methods:
f = route_handler
break
else:
f = 405
return f
return f, p, s
def default_options_handler(self, req):
allow = []
for route_methods, route_pattern, route_handler in self.url_map:
for route_methods, route_pattern, _, _, _ in self.url_map:
if route_pattern.match(req.path) is not None:
allow.extend(route_methods)
if 'GET' in allow:
@@ -1349,43 +1373,76 @@ class Microdot:
method=req.method, path=req.path,
status_code=res.status_code))
def get_request_handlers(self, req, attr, local_first=True):
handlers = getattr(self, attr + '_handlers')
local_handlers = getattr(req.subapp, attr + '_handlers') \
if req and req.subapp else []
return local_handlers + handlers if local_first \
else handlers + local_handlers
async def error_response(self, req, status_code, reason=None):
if req and req.subapp and status_code in req.subapp.error_handlers:
return await invoke_handler(
req.subapp.error_handlers[status_code], req)
elif status_code in self.error_handlers:
return await invoke_handler(self.error_handlers[status_code], req)
return reason or 'N/A', status_code
async def dispatch_request(self, req):
after_request_handled = False
if req:
if req.content_length > req.max_content_length:
if 413 in self.error_handlers:
res = await invoke_handler(self.error_handlers[413], req)
else:
res = 'Payload too large', 413
# the request body is larger than allowed
res = await self.error_response(req, 413, 'Payload too large')
else:
f = self.find_route(req)
# find the route in the app's URL map
f, req.url_prefix, req.subapp = self.find_route(req)
try:
res = None
if callable(f):
for handler in self.before_request_handlers:
# invoke the before request handlers
for handler in self.get_request_handlers(
req, 'before_request', False):
res = await invoke_handler(handler, req)
if res:
break
# invoke the endpoint handler
if res is None:
res = await invoke_handler(
f, req, **req.url_args)
res = await invoke_handler(f, req, **req.url_args)
# process the response
if isinstance(res, int):
# an integer response is taken as a status code
# with an empty body
res = '', res
if isinstance(res, tuple):
# handle a tuple response
if isinstance(res[0], int):
# a tuple that starts with an int has an empty
# body
res = ('', res[0],
res[1] if len(res) > 1 else {})
body = res[0]
if isinstance(res[1], int):
# extract the status code and headers (if
# available)
status_code = res[1]
headers = res[2] if len(res) > 2 else {}
else:
# if the status code is missing, assume 200
status_code = 200
headers = res[1]
res = Response(body, status_code, headers)
elif not isinstance(res, Response):
# any other response types are wrapped in a
# Response object
res = Response(res)
for handler in self.after_request_handlers:
# invoke the after request handlers
for handler in self.get_request_handlers(
req, 'after_request', True):
res = await invoke_handler(
handler, req, res) or res
for handler in req.after_request_handlers:
@@ -1393,50 +1450,62 @@ class Microdot:
handler, req, res) or res
after_request_handled = True
elif isinstance(f, dict):
# the response from an OPTIONS request is a dict with
# headers
res = Response(headers=f)
elif f in self.error_handlers:
res = await invoke_handler(self.error_handlers[f], req)
else:
res = 'Not found', f
# if the route is not found, return a 404 or 405
# response as appropriate
res = await self.error_response(req, f, 'Not found')
except HTTPException as exc:
if exc.status_code in self.error_handlers:
res = self.error_handlers[exc.status_code](req)
else:
res = exc.reason, exc.status_code
# an HTTP exception was raised while handling this request
res = await self.error_response(req, exc.status_code,
exc.reason)
except Exception as exc:
# an unexpected exception was raised while handling this
# request
print_exception(exc)
exc_class = None
# invoke the error handler for the exception class if one
# exists
handler = None
res = None
if exc.__class__ in self.error_handlers:
exc_class = exc.__class__
if req.subapp and exc.__class__ in \
req.subapp.error_handlers:
handler = req.subapp.error_handlers[exc.__class__]
elif exc.__class__ in self.error_handlers:
handler = self.error_handlers[exc.__class__]
else:
# walk up the exception class hierarchy to try to find
# a handler
for c in mro(exc.__class__)[1:]:
if c in self.error_handlers:
exc_class = c
if req.subapp and c in req.subapp.error_handlers:
handler = req.subapp.error_handlers[c]
break
if exc_class:
elif c in self.error_handlers:
handler = self.error_handlers[c]
break
if handler:
try:
res = await invoke_handler(
self.error_handlers[exc_class], req, exc)
res = await invoke_handler(handler, req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = await invoke_handler(
self.error_handlers[500], req)
else:
res = 'Internal server error', 500
# if there is still no response, issue a 500 error
res = await self.error_response(
req, 500, 'Internal server error')
else:
if 400 in self.error_handlers:
res = await invoke_handler(self.error_handlers[400], req)
else:
res = 'Bad request', 400
# if the request could not be parsed, issue a 400 error
res = await self.error_response(req, 400, 'Bad request')
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
if not after_request_handled:
for handler in self.after_error_request_handlers:
# if the request did not finish due to an error, invoke the after
# error request handler
for handler in self.get_request_handlers(
req, 'after_error_request', True):
res = await invoke_handler(
handler, req, res) or res
res.is_head = (req and req.method == 'HEAD')

View File

@@ -9,3 +9,5 @@ from tests.test_sse import * # noqa: F401, F403
from tests.test_cors import * # noqa: F401, F403
from tests.test_utemplate import * # noqa: F401, F403
from tests.test_session import * # noqa: F401, F403
from tests.test_auth import * # noqa: F401, F403
from tests.test_login import * # noqa: F401, F403

125
tests/test_auth.py Normal file
View File

@@ -0,0 +1,125 @@
import asyncio
import binascii
import unittest
from microdot import Microdot
from microdot.auth import BasicAuth, TokenAuth
from microdot.test_client import TestClient
class TestAuth(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.authenticate
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
return {'username': username}
@app.route('/')
@basic_auth
def index(request):
return request.g.current_user['username']
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()}))
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'X-Token-Auth': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler(request):
return {'status_code': 403}, 403
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

188
tests/test_login.py Normal file
View File

@@ -0,0 +1,188 @@
import asyncio
import unittest
from microdot import Microdot
from microdot.login import Login
from microdot.session import Session
from microdot.test_client import TestClient
class TestLogin(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_login(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
class User:
def __init__(self, id, name):
self.id = id
self.name = name
@login.user_loader
def load_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return request.g.current_user.name
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(123, 'user123'))
@app.post('/logout')
async def logout_route(request):
await login.logout_user(request)
return 'ok'
client = TestClient(app)
res = self._run(client.get('/?foo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 1)
self.assertIn('session', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user123')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)
def test_login_bad_user_id(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return None
@app.get('/foo')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user'))
client = TestClient(app)
res = self._run(client.post('/login?next=/'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/foo')
def test_login_bad_redirect(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return user_id
@app.get('/')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user'))
client = TestClient(app)
res = self._run(client.post('/login?next=http://example.com'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
def test_login_remember(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return {'user': request.g.current_user.id}
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user1'),
remember=True)
@app.post('/logout')
async def logout(request):
await login.logout_user(request)
return 'ok'
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'fresh {request.g.current_user.id}'
client = TestClient(app)
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 2)
self.assertIn('session', client.cookies)
self.assertIn('_remember', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, '{"user": 1}')
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'fresh 1')
del client.cookies['session']
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
self.assertFalse('_remember' in client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)

View File

@@ -794,7 +794,7 @@ class TestMicrodot(unittest.TestCase):
@subapp.route('/app')
def index(req):
return req.g.before + ':foo'
return req.g.before + ':' + req.url_prefix
app = Microdot()
app.mount(subapp, url_prefix='/sub')
@@ -811,4 +811,203 @@ class TestMicrodot(unittest.TestCase):
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'before:foo:after')
self.assertEqual(res.text, 'before:/sub:after')
def test_mount_local(self):
subapp1 = Microdot()
subapp2 = Microdot()
@subapp1.before_request
def before1(req):
req.g.before += ':before1'
@subapp1.after_error_request
def after_error1(req, res):
res.body += b':errorafter'
@subapp1.errorhandler(ValueError)
def value_error(req, exc):
return str(exc), 400
@subapp1.route('/')
def index1(req):
raise ZeroDivisionError()
@subapp1.route('/foo')
def foo(req):
return req.g.before + ':foo:' + req.url_prefix
@subapp1.route('/err')
def err(req):
raise ValueError('err')
@subapp1.route('/err2')
def err2(req):
class MyErr(ValueError):
pass
raise MyErr('err')
@subapp2.before_request
def before2(req):
req.g.before += ':before2'
@subapp2.after_request
def after2(req, res):
res.body += b':after'
@subapp2.errorhandler(405)
def method_not_found2(req):
return '405', 405
@subapp2.route('/bar')
def bar(req):
return req.g.before + ':bar:' + req.url_prefix
@subapp2.route('/baz')
def baz(req):
abort(405)
app = Microdot()
@app.before_request
def before(req):
req.g.before = 'before-app'
@app.after_request
def after(req, res):
res.body += b':after-app'
app.mount(subapp1, local=True)
app.mount(subapp2, url_prefix='/sub', local=True)
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 500)
self.assertEqual(res.text, 'Internal server error:errorafter')
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text, 'before-app:before1:foo::after-app')
res = self._run(client.get('/err'))
self.assertEqual(res.status_code, 400)
self.assertEqual(res.text, 'err:errorafter')
res = self._run(client.get('/err2'))
self.assertEqual(res.status_code, 400)
self.assertEqual(res.text, 'err:errorafter')
res = self._run(client.get('/sub/bar'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.headers['Content-Type'],
'text/plain; charset=UTF-8')
self.assertEqual(res.text,
'before-app:before2:bar:/sub:after:after-app')
res = self._run(client.post('/sub/bar'))
self.assertEqual(res.status_code, 405)
self.assertEqual(res.text, '405')
res = self._run(client.get('/sub/baz'))
self.assertEqual(res.status_code, 405)
self.assertEqual(res.text, '405')
def test_many_mounts(self):
subsubapp = Microdot()
@subsubapp.before_request
def subsubapp_before(req):
req.g.before = 'subsubapp'
@subsubapp.route('/')
def subsubapp_index(req):
return f'{req.g.before}:{req.subapp == subsubapp}:{req.url_prefix}'
subapp = Microdot()
@subapp.before_request
def subapp_before(req):
req.g.before = 'subapp'
@subapp.route('/')
def subapp_index(req):
return f'{req.g.before}:{req.subapp == subapp}:{req.url_prefix}'
app = Microdot()
@app.before_request
def app_before(req):
req.g.before = 'app'
@app.route('/')
def app_index(req):
return f'{req.g.before}:{req.subapp is None}:{req.url_prefix}'
subapp.mount(subsubapp, url_prefix='/subsub')
app.mount(subapp, url_prefix='/sub')
client = TestClient(app)
res = self._run(client.get('/sub/subsub/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'subsubapp:True:/sub/subsub')
res = self._run(client.get('/sub/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'subsubapp:True:/sub')
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'subsubapp:True:')
def test_many_local_mounts(self):
subsubapp = Microdot()
@subsubapp.before_request
def subsubapp_before(req):
req.g.before = 'subsubapp'
@subsubapp.route('/')
def subsubapp_index(req):
return f'{req.g.before}:{req.subapp == subsubapp}:{req.url_prefix}'
subapp = Microdot()
@subapp.before_request
def subapp_before(req):
req.g.before = 'subapp'
@subapp.route('/')
def subapp_index(req):
return f'{req.g.before}:{req.subapp == subapp}:{req.url_prefix}'
app = Microdot()
@app.before_request
def app_before(req):
req.g.before = 'app'
@app.route('/')
def app_index(req):
return f'{req.g.before}:{req.subapp is None}:{req.url_prefix}'
subapp.mount(subsubapp, url_prefix='/subsub', local=True)
app.mount(subapp, url_prefix='/sub', local=True)
client = TestClient(app)
res = self._run(client.get('/sub/subsub/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'subsubapp:True:/sub/subsub')
res = self._run(client.get('/sub/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'subapp:True:/sub')
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'app:True:')

View File

@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
@app.post('/set')
@with_session
async def save_session(req, session):
def save_session(req, session):
session['name'] = 'joe'
session.save()
return 'OK'