Basic and token authentication support

This commit is contained in:
Miguel Grinberg
2025-02-03 19:46:11 +00:00
parent cd87abba30
commit 675c978797
9 changed files with 481 additions and 13 deletions

View File

@@ -44,6 +44,14 @@ User Sessions
.. automodule:: microdot.session
:members:
Authentication
--------------
.. automodule:: microdot.auth
:inherited-members:
:special-members: __call__
:members:
Cross-Origin Resource Sharing (CORS)
------------------------------------

View File

@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
described in this section are maintained as part of the Microdot project in
the same source code repository.
WebSocket Support
~~~~~~~~~~~~~~~~~
WebSocket
~~~~~~~~-
.. list-table::
:align: left
@@ -39,8 +39,8 @@ Example::
message = await ws.receive()
await ws.send(message)
Server-Sent Events Support
~~~~~~~~~~~~~~~~~~~~~~~~~~
Server-Sent Events
~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -78,8 +78,8 @@ Example::
the SSE object. For bidirectional communication with the client, use the
WebSocket extension.
Rendering Templates
~~~~~~~~~~~~~~~~~~~
Templates
~~~~~~~~~
Many web applications use HTML templates for rendering content to clients.
Microdot includes extensions to render templates with the
@@ -202,8 +202,8 @@ must be used.
.. note::
The Jinja extension is not compatible with MicroPython.
Maintaining Secure User Sessions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Secure User Sessions
~~~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -270,6 +270,92 @@ The :func:`save() <microdot.session.SessionDict.save>` and
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
and destroy the user session respectively.
Authentication
~~~~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
* - Required external dependencies
- | None
* - Examples
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
The authentication extension provides helper classes for two commonly used
authentication patterns, described below.
Basic Authentication
^^^^^^^^^^^^^^^^^^^^
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
is a method of authentication that is part of the HTTP specification. It allows
clients to authenticate to a server using a username and a password. Web
browsers have native support for Basic Authentication and will automatically
prompt the user for a username and a password when a protected resource is
accessed.
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
class::
from microdot.auth import BasicAuth
auth = BasicAuth(app)
Next, create an authentication function. The function must accept a request
object and a username and password pair provided by the user. If the
credentials are valid, the function must return an object that represents the
user. If the authentication function cannot validate the user provided
credentials it must return ``None``. Decorate the function with
``@auth.authenticate``::
@auth.authenticate
async def verify_user(request, username, password):
user = await load_user_from_database(username)
if user and user.verify_password(password):
return user
To protect a route with authentication, add the ``auth`` instance as a
decorator::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
While running an authenticated request, the user object returned by the
authenticaction function is accessible as ``request.g.current_user``.
Token Authentication
^^^^^^^^^^^^^^^^^^^^
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
from microdot.auth import TokenAuth
auth = TokenAuth()
Then add a function that verifies the token and returns the user it belongs to,
or ``None`` if the token is invalid or expired::
@auth.authenticate
async def verify_token(request, token):
return load_user_from_token(token)
As with Basic authentication, the ``auth`` instance is used as a decorator to
protect your routes::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -305,8 +391,8 @@ Example::
cors = CORS(app, allowed_origins=['https://example.com'],
allow_credentials=True)
Testing with the Test Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Client
~~~~~~~~~~~
.. list-table::
:align: left
@@ -342,8 +428,8 @@ Example::
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
class for more details.
Deploying on a Production Web Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Production Deployments
~~~~~~~~~~~~~~~~~~~~~~
The ``Microdot`` class creates its own simple web server. This is enough for an
application deployed with MicroPython, but when using CPython it may be useful

1
examples/auth/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate basic and token authentication.

View File

@@ -0,0 +1,31 @@
from microdot import Microdot
from microdot.auth import BasicAuth
from pbkdf2 import generate_password_hash, check_password_hash
# this example provides an implementation of the generate_password_hash and
# check_password_hash functions that can be used in MicroPython. On CPython
# there are many other options for password hashisng so there is no need to use
# this custom solution.
USERS = {
'susan': generate_password_hash('hello'),
'david': generate_password_hash('bye'),
}
app = Microdot()
auth = BasicAuth()
@auth.authenticate
async def check_credentials(request, username, password):
if username in USERS and check_password_hash(USERS[username], password):
return username
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

47
examples/auth/pbkdf2.py Normal file
View File

@@ -0,0 +1,47 @@
import os
import hashlib
# PBKDF2 secure password hashing algorithm obtained from:
# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with-
# python,-example-code/
def sha256(b):
return hashlib.sha256(b).digest()
def ljust(b, n, f):
return b + f * (n - len(b))
def gethmac(key, content):
okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0'))
ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0'))
return sha256(okeypad + sha256(ikeypad + content))
def pbkdf2(pwd, salt, iterations=1000):
U = salt + b'\x00\x00\x00\x01'
T = bytes(64)
for _ in range(iterations):
U = gethmac(pwd, U)
T = bytes(a ^ b for a, b in zip(U, T))
return T
# The number of iterations may need to be adjusted depending on the hardware.
# Lower numbers make the password hashing algorithm faster but less secure, so
# the largest number that can be tolerated should be used.
def generate_password_hash(password, salt=None, iterations=100000):
salt = salt or os.urandom(16)
dk = pbkdf2(password.encode(), salt, iterations)
return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}'
def check_password_hash(password_hash, password):
algorithm, salt, iterations, dk = password_hash.split(':')
iterations = int(iterations)
if algorithm != 'pbkdf2-hmac-sha256':
return False
return pbkdf2(password.encode(), salt=bytes.fromhex(salt),
iterations=iterations) == bytes.fromhex(dk)

View File

@@ -0,0 +1,26 @@
from microdot import Microdot
from microdot.auth import TokenAuth
app = Microdot()
auth = TokenAuth()
TOKENS = {
'susan-token': 'susan',
'david-token': 'david',
}
@auth.authenticate
async def check_token(request, token):
if token in TOKENS:
return TOKENS[token]
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

144
src/microdot/auth.py Normal file
View File

@@ -0,0 +1,144 @@
from microdot import abort
from microdot.microdot import invoke_handler
class BaseAuth:
def __init__(self):
self.auth_callback = None
self.error_callback = None
def __call__(self, f):
"""Decorator to protect a route with authentication.
An instance of this class must be used as a decorator on the routes
that need to be protected. Example::
auth = BasicAuth() # or TokenAuth()
@app.route('/protected')
@auth
def protected(request):
# ...
Routes that are decorated in this way will only be invoked if the
authentication callback returned a valid user object, otherwise the
error callback will be executed.
"""
async def wrapper(request, *args, **kwargs):
auth = self._get_auth(request)
if not auth:
return await invoke_handler(self.error_callback, request)
request.g.current_user = await invoke_handler(
self.auth_callback, request, *auth)
if not request.g.current_user:
return await invoke_handler(self.error_callback, request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
"""Basic Authentication.
:param realm: The realm that is displayed when the user is prompted to
authenticate in the browser.
:param charset: The charset that is used to encode the realm.
:param scheme: The authentication scheme. Defaults to 'Basic'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
error_status=401):
super().__init__()
self.realm = realm
self.charset = charset
self.scheme = scheme
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get('Authorization')
if auth and auth.startswith('Basic '):
import binascii
try:
username, password = binascii.a2b_base64(
auth[6:]).decode().split(':', 1)
except Exception: # pragma: no cover
return None
return username, password
def authentication_error(self, request):
return '', self.error_status, {
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
self.scheme, self.realm, self.charset)}
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, username, password):
user = get_user(username)
if user and user.check_password(password):
return get_user(username)
"""
self.auth_callback = f
class TokenAuth(BaseAuth):
"""Token based authentication.
:param header: The name of the header that will contain the token. Defaults
to 'Authorization'.
:param scheme: The authentication scheme. Defaults to 'Bearer'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, header='Authorization', scheme='Bearer',
error_status=401):
super().__init__()
self.header = header
self.scheme = scheme.lower()
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get(self.header)
if auth:
if self.header == 'Authorization':
try:
scheme, token = auth.split(' ', 1)
except Exception:
return None
if scheme.lower() == self.scheme:
return (token.strip(),)
else:
return (auth,)
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, token):
return get_user(token)
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def authentication_error(self, request):
abort(self.error_status)

125
tests/test_auth.py Normal file
View File

@@ -0,0 +1,125 @@
import asyncio
import binascii
import unittest
from microdot import Microdot
from microdot.auth import BasicAuth, TokenAuth
from microdot.test_client import TestClient
class TestAuth(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.authenticate
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
return {'username': username}
@app.route('/')
@basic_auth
def index(request):
return request.g.current_user['username']
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()}))
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'X-Token-Auth': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler(request):
return {'status_code': 403}, 403
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

View File

@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
@app.post('/set')
@with_session
async def save_session(req, session):
def save_session(req, session):
session['name'] = 'joe'
session.save()
return 'OK'