diff --git a/docs/api.rst b/docs/api.rst index f63db86..3f31639 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -44,6 +44,14 @@ User Sessions .. automodule:: microdot.session :members: +Authentication +-------------- + +.. automodule:: microdot.auth + :inherited-members: + :special-members: __call__ + :members: + Cross-Origin Resource Sharing (CORS) ------------------------------------ diff --git a/docs/extensions.rst b/docs/extensions.rst index 5f2da1e..aa228c2 100644 --- a/docs/extensions.rst +++ b/docs/extensions.rst @@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions described in this section are maintained as part of the Microdot project in the same source code repository. -WebSocket Support -~~~~~~~~~~~~~~~~~ +WebSocket +~~~~~~~~- .. list-table:: :align: left @@ -39,8 +39,8 @@ Example:: message = await ws.receive() await ws.send(message) -Server-Sent Events Support -~~~~~~~~~~~~~~~~~~~~~~~~~~ +Server-Sent Events +~~~~~~~~~~~~~~~~~~ .. list-table:: :align: left @@ -78,8 +78,8 @@ Example:: the SSE object. For bidirectional communication with the client, use the WebSocket extension. -Rendering Templates -~~~~~~~~~~~~~~~~~~~ +Templates +~~~~~~~~~ Many web applications use HTML templates for rendering content to clients. Microdot includes extensions to render templates with the @@ -202,8 +202,8 @@ must be used. .. note:: The Jinja extension is not compatible with MicroPython. -Maintaining Secure User Sessions -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Secure User Sessions +~~~~~~~~~~~~~~~~~~~~ .. list-table:: :align: left @@ -270,6 +270,92 @@ The :func:`save() ` and :func:`delete() ` methods are used to update and destroy the user session respectively. +Authentication +~~~~~~~~~~~~~~ + +.. list-table:: + :align: left + + * - Compatibility + - | CPython & MicroPython + + * - Required Microdot source files + - | `auth.py `_ + + * - Required external dependencies + - | None + + * - Examples + - | `basic_auth.py `_ + | `token_auth.py `_ +The authentication extension provides helper classes for two commonly used +authentication patterns, described below. + +Basic Authentication +^^^^^^^^^^^^^^^^^^^^ + +`Basic Authentication `_ +is a method of authentication that is part of the HTTP specification. It allows +clients to authenticate to a server using a username and a password. Web +browsers have native support for Basic Authentication and will automatically +prompt the user for a username and a password when a protected resource is +accessed. + +To use Basic Authentication, create an instance of the :class:`BasicAuth ` +class:: + + from microdot.auth import BasicAuth + + auth = BasicAuth(app) + +Next, create an authentication function. The function must accept a request +object and a username and password pair provided by the user. If the +credentials are valid, the function must return an object that represents the +user. If the authentication function cannot validate the user provided +credentials it must return ``None``. Decorate the function with +``@auth.authenticate``:: + + @auth.authenticate + async def verify_user(request, username, password): + user = await load_user_from_database(username) + if user and user.verify_password(password): + return user + +To protect a route with authentication, add the ``auth`` instance as a +decorator:: + + @app.route('/') + @auth + async def index(request): + return f'Hello, {request.g.current_user}!' + +While running an authenticated request, the user object returned by the +authenticaction function is accessible as ``request.g.current_user``. + +Token Authentication +^^^^^^^^^^^^^^^^^^^^ + +To set up token authentication, create an instance of :class:`TokenAuth `:: + + from microdot.auth import TokenAuth + + auth = TokenAuth() + +Then add a function that verifies the token and returns the user it belongs to, +or ``None`` if the token is invalid or expired:: + + @auth.authenticate + async def verify_token(request, token): + return load_user_from_token(token) + +As with Basic authentication, the ``auth`` instance is used as a decorator to +protect your routes:: + + @app.route('/') + @auth + async def index(request): + return f'Hello, {request.g.current_user}!' + Cross-Origin Resource Sharing (CORS) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -305,8 +391,8 @@ Example:: cors = CORS(app, allowed_origins=['https://example.com'], allow_credentials=True) -Testing with the Test Client -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Test Client +~~~~~~~~~~~ .. list-table:: :align: left @@ -342,8 +428,8 @@ Example:: See the documentation for the :class:`TestClient ` class for more details. -Deploying on a Production Web Server -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Production Deployments +~~~~~~~~~~~~~~~~~~~~~~ The ``Microdot`` class creates its own simple web server. This is enough for an application deployed with MicroPython, but when using CPython it may be useful diff --git a/examples/auth/README.md b/examples/auth/README.md new file mode 100644 index 0000000..46f692a --- /dev/null +++ b/examples/auth/README.md @@ -0,0 +1 @@ +This directory contains examples that demonstrate basic and token authentication. diff --git a/examples/auth/basic_auth.py b/examples/auth/basic_auth.py new file mode 100644 index 0000000..0e7de15 --- /dev/null +++ b/examples/auth/basic_auth.py @@ -0,0 +1,31 @@ +from microdot import Microdot +from microdot.auth import BasicAuth +from pbkdf2 import generate_password_hash, check_password_hash + + +# this example provides an implementation of the generate_password_hash and +# check_password_hash functions that can be used in MicroPython. On CPython +# there are many other options for password hashisng so there is no need to use +# this custom solution. +USERS = { + 'susan': generate_password_hash('hello'), + 'david': generate_password_hash('bye'), +} +app = Microdot() +auth = BasicAuth() + + +@auth.authenticate +async def check_credentials(request, username, password): + if username in USERS and check_password_hash(USERS[username], password): + return username + + +@app.route('/') +@auth +async def index(request): + return f'Hello, {request.g.current_user}!' + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/examples/auth/pbkdf2.py b/examples/auth/pbkdf2.py new file mode 100644 index 0000000..ccd18b7 --- /dev/null +++ b/examples/auth/pbkdf2.py @@ -0,0 +1,47 @@ +import os +import hashlib + +# PBKDF2 secure password hashing algorithm obtained from: +# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with- +# python,-example-code/ + + +def sha256(b): + return hashlib.sha256(b).digest() + + +def ljust(b, n, f): + return b + f * (n - len(b)) + + +def gethmac(key, content): + okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0')) + ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0')) + return sha256(okeypad + sha256(ikeypad + content)) + + +def pbkdf2(pwd, salt, iterations=1000): + U = salt + b'\x00\x00\x00\x01' + T = bytes(64) + for _ in range(iterations): + U = gethmac(pwd, U) + T = bytes(a ^ b for a, b in zip(U, T)) + return T + + +# The number of iterations may need to be adjusted depending on the hardware. +# Lower numbers make the password hashing algorithm faster but less secure, so +# the largest number that can be tolerated should be used. +def generate_password_hash(password, salt=None, iterations=100000): + salt = salt or os.urandom(16) + dk = pbkdf2(password.encode(), salt, iterations) + return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}' + + +def check_password_hash(password_hash, password): + algorithm, salt, iterations, dk = password_hash.split(':') + iterations = int(iterations) + if algorithm != 'pbkdf2-hmac-sha256': + return False + return pbkdf2(password.encode(), salt=bytes.fromhex(salt), + iterations=iterations) == bytes.fromhex(dk) diff --git a/examples/auth/token_auth.py b/examples/auth/token_auth.py new file mode 100644 index 0000000..ddfce3c --- /dev/null +++ b/examples/auth/token_auth.py @@ -0,0 +1,26 @@ +from microdot import Microdot +from microdot.auth import TokenAuth + +app = Microdot() +auth = TokenAuth() + +TOKENS = { + 'susan-token': 'susan', + 'david-token': 'david', +} + + +@auth.authenticate +async def check_token(request, token): + if token in TOKENS: + return TOKENS[token] + + +@app.route('/') +@auth +async def index(request): + return f'Hello, {request.g.current_user}!' + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/src/microdot/auth.py b/src/microdot/auth.py new file mode 100644 index 0000000..1fcf687 --- /dev/null +++ b/src/microdot/auth.py @@ -0,0 +1,144 @@ +from microdot import abort +from microdot.microdot import invoke_handler + + +class BaseAuth: + def __init__(self): + self.auth_callback = None + self.error_callback = None + + def __call__(self, f): + """Decorator to protect a route with authentication. + + An instance of this class must be used as a decorator on the routes + that need to be protected. Example:: + + auth = BasicAuth() # or TokenAuth() + + @app.route('/protected') + @auth + def protected(request): + # ... + + Routes that are decorated in this way will only be invoked if the + authentication callback returned a valid user object, otherwise the + error callback will be executed. + """ + async def wrapper(request, *args, **kwargs): + auth = self._get_auth(request) + if not auth: + return await invoke_handler(self.error_callback, request) + request.g.current_user = await invoke_handler( + self.auth_callback, request, *auth) + if not request.g.current_user: + return await invoke_handler(self.error_callback, request) + return await invoke_handler(f, request, *args, **kwargs) + + return wrapper + + +class BasicAuth(BaseAuth): + """Basic Authentication. + + :param realm: The realm that is displayed when the user is prompted to + authenticate in the browser. + :param charset: The charset that is used to encode the realm. + :param scheme: The authentication scheme. Defaults to 'Basic'. + :param error_status: The error status code to return when authentication + fails. Defaults to 401. + """ + def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic', + error_status=401): + super().__init__() + self.realm = realm + self.charset = charset + self.scheme = scheme + self.error_status = error_status + self.error_callback = self.authentication_error + + def _get_auth(self, request): + auth = request.headers.get('Authorization') + if auth and auth.startswith('Basic '): + import binascii + try: + username, password = binascii.a2b_base64( + auth[6:]).decode().split(':', 1) + except Exception: # pragma: no cover + return None + return username, password + + def authentication_error(self, request): + return '', self.error_status, { + 'WWW-Authenticate': '{} realm="{}", charset="{}"'.format( + self.scheme, self.realm, self.charset)} + + def authenticate(self, f): + """Decorator to configure the authentication callback. + + This decorator must be used with a function that accepts the request + object, a username and a password and returns a user object if the + credentials are valid, or ``None`` if they are not. Example:: + + @auth.authenticate + async def check_credentials(request, username, password): + user = get_user(username) + if user and user.check_password(password): + return get_user(username) + """ + self.auth_callback = f + + +class TokenAuth(BaseAuth): + """Token based authentication. + + :param header: The name of the header that will contain the token. Defaults + to 'Authorization'. + :param scheme: The authentication scheme. Defaults to 'Bearer'. + :param error_status: The error status code to return when authentication + fails. Defaults to 401. + """ + def __init__(self, header='Authorization', scheme='Bearer', + error_status=401): + super().__init__() + self.header = header + self.scheme = scheme.lower() + self.error_status = error_status + self.error_callback = self.authentication_error + + def _get_auth(self, request): + auth = request.headers.get(self.header) + if auth: + if self.header == 'Authorization': + try: + scheme, token = auth.split(' ', 1) + except Exception: + return None + if scheme.lower() == self.scheme: + return (token.strip(),) + else: + return (auth,) + + def authenticate(self, f): + """Decorator to configure the authentication callback. + + This decorator must be used with a function that accepts the request + object, a username and a password and returns a user object if the + credentials are valid, or ``None`` if they are not. Example:: + + @auth.authenticate + async def check_credentials(request, token): + return get_user(token) + """ + self.auth_callback = f + + def errorhandler(self, f): + """Decorator to configure the error callback. + + Microdot calls the error callback to allow the application to generate + a custom error response. The default error response is to call + ``abort(401)``. + """ + self.error_callback = f + + def authentication_error(self, request): + abort(self.error_status) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..bd64365 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,125 @@ +import asyncio +import binascii +import unittest +from microdot import Microdot +from microdot.auth import BasicAuth, TokenAuth +from microdot.test_client import TestClient + + +class TestAuth(unittest.TestCase): + @classmethod + def setUpClass(cls): + if hasattr(asyncio, 'set_event_loop'): + asyncio.set_event_loop(asyncio.new_event_loop()) + cls.loop = asyncio.get_event_loop() + + def _run(self, coro): + return self.loop.run_until_complete(coro) + + def test_basic_auth(self): + app = Microdot() + basic_auth = BasicAuth() + + @basic_auth.authenticate + def authenticate(request, username, password): + if username == 'foo' and password == 'bar': + return {'username': username} + + @app.route('/') + @basic_auth + def index(request): + return request.g.current_user['username'] + + client = TestClient(app) + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'Authorization': 'Basic ' + binascii.b2a_base64( + b'foo:bar').decode()})) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'foo') + + res = self._run(client.get('/', headers={ + 'Authorization': 'Basic ' + binascii.b2a_base64( + b'foo:baz').decode()})) + self.assertEqual(res.status_code, 401) + + def test_token_auth(self): + app = Microdot() + token_auth = TokenAuth() + + @token_auth.authenticate + def authenticate(request, token): + if token == 'foo': + return 'user' + + @app.route('/') + @token_auth + def index(request): + return request.g.current_user + + client = TestClient(app) + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'Authorization': 'Basic foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={'Authorization': 'foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'Authorization': 'Bearer foo'})) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'user') + + def test_token_auth_custom_header(self): + app = Microdot() + token_auth = TokenAuth(header='X-Auth-Token') + + @token_auth.authenticate + def authenticate(request, token): + if token == 'foo': + return 'user' + + @app.route('/') + @token_auth + def index(request): + return request.g.current_user + + client = TestClient(app) + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'Authorization': 'Basic foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={'Authorization': 'foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'Authorization': 'Bearer foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={ + 'X-Token-Auth': 'Bearer foo'})) + self.assertEqual(res.status_code, 401) + + res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'})) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'user') + + res = self._run(client.get('/', headers={'x-auth-token': 'foo'})) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'user') + + @token_auth.errorhandler + def error_handler(request): + return {'status_code': 403}, 403 + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 403) + self.assertEqual(res.json, {'status_code': 403}) diff --git a/tests/test_session.py b/tests/test_session.py index 0359ed9..43d8e2a 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -37,7 +37,7 @@ class TestSession(unittest.TestCase): @app.post('/set') @with_session - async def save_session(req, session): + def save_session(req, session): session['name'] = 'joe' session.save() return 'OK'