diff --git a/docs/api.rst b/docs/api.rst index 3f31639..07a070d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -52,6 +52,14 @@ Authentication :special-members: __call__ :members: +User Logins +----------- + +.. automodule:: microdot.login + :inherited-members: + :special-members: __call__ + :members: + Cross-Origin Resource Sharing (CORS) ------------------------------------ diff --git a/docs/extensions.rst b/docs/extensions.rst index aa228c2..495cf2c 100644 --- a/docs/extensions.rst +++ b/docs/extensions.rst @@ -288,6 +288,7 @@ Authentication * - Examples - | `basic_auth.py `_ | `token_auth.py `_ + The authentication extension provides helper classes for two commonly used authentication patterns, described below. @@ -355,6 +356,91 @@ protect your routes:: @auth async def index(request): return f'Hello, {request.g.current_user}!' + +User Logins +~~~~~~~~~~~ + +.. list-table:: + :align: left + + * - Compatibility + - | CPython & MicroPython + + * - Required Microdot source files + - | `login.py `_ + | `session.py `_ + * - Required external dependencies + - | CPython: `PyJWT `_ + | MicroPython: `jwt.py `_, + `hmac.py `_ + * - Examples + - | `login.py `_ + +The login extension provides user login functionality. The logged in state of +the user is stored in the user session cookie, and an optional "remember me" +cookie can also be added to keep the user logged in across browser sessions. + +To use this extension, create instances of the +:class:`Session ` and :class:`Login ` +class:: + + Session(app, secret_key='top-secret!') + login = Login() + +The ``Login`` class accept an optional argument with the URL of the login page. +The default for this URL is */login*. + +The application must represent users as objects with an ``id`` attribute. A +function decorated with ``@login.user_loader`` is used to load a user object:: + + @login.user_loader + async def get_user(user_id): + return database.get_user(user_id) + +The application must implement the login form. At the point in which the user +credentials have been received and verified, a call to the +:func:`login_user() ` function must be made to +record the user in the user session:: + + @app.route('/login', methods=['GET', 'POST']) + async def login(request): + # ... + if user.check_password(password): + return await login.login_user(request, user, remember=remember_me) + return redirect('/login') + +The optional ``remember`` argument is used to add a remember me cookie that +will log the user in automatically in future sessions. A value of ``True`` will +keep the log in active for 30 days. Alternatively, an integer number of days +can be passed in this argument. + +Any routes that require the user to be logged in must be decorated with +:func:`@login `:: + + @app.route('/') + @login + async def index(request): + # ... + +Routes that are of a sensitive nature can be decorated with +:func:`@login.fresh ` +instead. This decorator requires that the user has logged in during the current +session, and will ask the user to logged in again if the session was +authenticated through a remember me cookie:: + + @app.get('/fresh') + @login.fresh + async def fresh(request): + # ... + +To log out a user, the :func:`logout_user() ` +is used:: + + @app.post('/logout') + @login + async def logout(request): + await login.logout_user(request) + return redirect('/') Cross-Origin Resource Sharing (CORS) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/login/README.md b/examples/login/README.md new file mode 100644 index 0000000..e41c267 --- /dev/null +++ b/examples/login/README.md @@ -0,0 +1 @@ +This directory contains examples that demonstrate user logins. diff --git a/examples/login/login.py b/examples/login/login.py new file mode 100644 index 0000000..3135e41 --- /dev/null +++ b/examples/login/login.py @@ -0,0 +1,123 @@ +from microdot import Microdot, redirect +from microdot.session import Session +from microdot.login import Login +from pbkdf2 import generate_password_hash, check_password_hash + +# this example provides an implementation of the generate_password_hash and +# check_password_hash functions that can be used in MicroPython. On CPython +# there are many other options for password hashisng so there is no need to use +# this custom solution. + + +class User: + def __init__(self, id, username, password): + self.id = id + self.username = username + self.password_hash = self.create_hash(password) + + def create_hash(self, password): + return generate_password_hash(password) + + def check_password(self, password): + return check_password_hash(self.password_hash, password) + + +USERS = { + 'user001': User('user001', 'susan', 'hello'), + 'user002': User('user002', 'david', 'bye'), +} + +app = Microdot() +Session(app, secret_key='top-secret!') +login = Login() + + +@login.user_loader +async def get_user(user_id): + return USERS.get(user_id) + + +@app.route('/login', methods=['GET', 'POST']) +async def login_page(request): + if request.method == 'GET': + return ''' + + + +

Please Login

+
+

+ Username
+ +

+

+ Password:
+ +
+

+

+ Remember me +
+

+

+ +

+
+ + + ''', {'Content-Type': 'text/html'} + username = request.form['username'] + password = request.form['password'] + remember_me = bool(request.form.get('remember_me')) + + for user in USERS.values(): + if user.username == username: + if user.check_password(password): + return await login.login_user(request, user, + remember=remember_me) + return redirect('/login') + + +@app.route('/') +@login +async def index(request): + return f''' + + + +

Hello, {request.g.current_user.username}!

+

+ Click here to access the fresh login page. +

+
+ +
+ + + ''', {'Content-Type': 'text/html'} + + +@app.get('/fresh') +@login.fresh +async def fresh(request): + return f''' + + + +

Hello, {request.g.current_user.username}!

+

This page requires a fresh login session.

+

Go back to the main page.

+ + + ''', {'Content-Type': 'text/html'} + + +@app.post('/logout') +@login +async def logout(request): + await login.logout_user(request) + return redirect('/') + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/examples/login/pbkdf2.py b/examples/login/pbkdf2.py new file mode 100644 index 0000000..ccd18b7 --- /dev/null +++ b/examples/login/pbkdf2.py @@ -0,0 +1,47 @@ +import os +import hashlib + +# PBKDF2 secure password hashing algorithm obtained from: +# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with- +# python,-example-code/ + + +def sha256(b): + return hashlib.sha256(b).digest() + + +def ljust(b, n, f): + return b + f * (n - len(b)) + + +def gethmac(key, content): + okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0')) + ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0')) + return sha256(okeypad + sha256(ikeypad + content)) + + +def pbkdf2(pwd, salt, iterations=1000): + U = salt + b'\x00\x00\x00\x01' + T = bytes(64) + for _ in range(iterations): + U = gethmac(pwd, U) + T = bytes(a ^ b for a, b in zip(U, T)) + return T + + +# The number of iterations may need to be adjusted depending on the hardware. +# Lower numbers make the password hashing algorithm faster but less secure, so +# the largest number that can be tolerated should be used. +def generate_password_hash(password, salt=None, iterations=100000): + salt = salt or os.urandom(16) + dk = pbkdf2(password.encode(), salt, iterations) + return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}' + + +def check_password_hash(password_hash, password): + algorithm, salt, iterations, dk = password_hash.split(':') + iterations = int(iterations) + if algorithm != 'pbkdf2-hmac-sha256': + return False + return pbkdf2(password.encode(), salt=bytes.fromhex(salt), + iterations=iterations) == bytes.fromhex(dk) diff --git a/examples/sessions/login.py b/examples/sessions/login.py index e7fd2ef..9a042ac 100644 --- a/examples/sessions/login.py +++ b/examples/sessions/login.py @@ -1,3 +1,6 @@ +# This is a simple example that demonstrates how to use the user session, but +# is not intended as a complete login solution. See the login subdirectory for +# a more complete example. from microdot import Microdot, Response, redirect from microdot.session import Session, with_session diff --git a/src/microdot/login.py b/src/microdot/login.py new file mode 100644 index 0000000..aa894e9 --- /dev/null +++ b/src/microdot/login.py @@ -0,0 +1,163 @@ +from time import time +from microdot import redirect +from microdot.microdot import urlencode, invoke_handler + + +class Login: + """User login support for Microdot. + + :param login_url: the URL to redirect to when a login is required. The + default is '/login'. + """ + def __init__(self, login_url='/login'): + self.login_url = login_url + self.user_loader_callback = None + + def user_loader(self, f): + """Decorator to configure the user callback. + + The decorated function receives the user ID as an argument and must + return the corresponding user object, or ``None`` if the user ID is + invalid. + """ + self.user_loader_callback = f + + def _get_session(self, request): + return request.app._session.get(request) + + def _update_remember_cookie(self, request, days, user_id=None): + remember_payload = request.app._session.encode({ + 'user_id': user_id, + 'days': days, + 'exp': time() + days * 24 * 60 * 60 + }) + + @request.after_request + async def _set_remember_cookie(request, response): + response.set_cookie('_remember', remember_payload, + max_age=days * 24 * 60 * 60) + return response + + def _get_user_id_from_session(self, request): + session = self._get_session(request) + if session and '_user_id' in session: + return session['_user_id'] + if '_remember' in request.cookies: + remember_payload = request.app._session.decode( + request.cookies['_remember']) + user_id = remember_payload.get('user_id') + if user_id: # pragma: no branch + self._update_remember_cookie( + request, remember_payload.get('_days', 30), user_id) + session['_user_id'] = user_id + session['_fresh'] = False + session.save() + return user_id + + async def _redirect_to_login(self, request): + return '', 302, {'Location': self.login_url + '?next=' + urlencode( + request.url)} + + async def login_user(self, request, user, remember=False, + redirect_url='/'): + """Log a user in. + + :param request: the request object + :param user: the user object + :param remember: if the user's logged in state should be remembered + with a cookie after the session ends. Set to the + number of days the remember cookie should last, or to + ``True`` to use a default duration of 30 days. + :param redirect_url: the URL to redirect to after login + + This call marks the user as logged in by storing their user ID in the + user session. The application must call this method to log a user in + after their credentials have been validated. + + The method returns a redirect response, either to the URL the user + originally intended to visit, or if there is no original URL to the URL + specified by the `redirect_url`. + """ + session = self._get_session(request) + session['_user_id'] = user.id + session['_fresh'] = True + session.save() + + if remember: + days = 30 if remember is True else int(remember) + self._update_remember_cookie(request, days, session['_user_id']) + + next_url = request.args.get('next', redirect_url) + if not next_url.startswith('/'): + next_url = redirect_url + return redirect(next_url) + + async def logout_user(self, request): + """Log a user out. + + :param request: the request object + + This call removes information about the user's log in from the user + session. If a remember cookie exists, it is removed as well. + """ + session = self._get_session(request) + session.pop('_user_id', None) + session.pop('_fresh', None) + session.save() + if '_remember' in request.cookies: + self._update_remember_cookie(request, 0) + + def __call__(self, f): + """Decorator to protect a route with authentication. + + If the user is not logged in, Microdot will redirect to the login page + first. The decorated route will only run after successful login by the + user. If the user is already logged in, the route will run immediately. + Example:: + + login = Login() + + @app.route('/secret') + @login + async def secret(request): + # only accessible to authenticated users + + """ + async def wrapper(request, *args, **kwargs): + user_id = self._get_user_id_from_session(request) + if not user_id: + return await self._redirect_to_login(request) + request.g.current_user = await invoke_handler( + self.user_loader_callback, user_id) + if not request.g.current_user: + return await self._redirect_to_login(request) + return await invoke_handler(f, request, *args, **kwargs) + + return wrapper + + def fresh(self, f): + """Decorator to protect a route with "fresh" authentication. + + This decorator prevents the route from running when the login session + is not fresh. A fresh session is a session that has been created from + direct user interaction with the login page, while a non-fresh session + occurs when a login is restored from a "remember me" cookie. Example:: + + login = Login() + + @app.route('/secret') + @auth.fresh + async def secret(request): + # only accessible to authenticated users + # users logged in via remember me cookie will need to + # re-authenticate + """ + base_wrapper = self.__call__(f) + + async def wrapper(request, *args, **kwargs): + session = self._get_session(request) + if session.get('_fresh'): + return await base_wrapper(request, *args, **kwargs) + return await self._redirect_to_login(request) + + return wrapper diff --git a/tests/__init__.py b/tests/__init__.py index 4f40481..3d8601f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -9,3 +9,5 @@ from tests.test_sse import * # noqa: F401, F403 from tests.test_cors import * # noqa: F401, F403 from tests.test_utemplate import * # noqa: F401, F403 from tests.test_session import * # noqa: F401, F403 +from tests.test_auth import * # noqa: F401, F403 +from tests.test_login import * # noqa: F401, F403 diff --git a/tests/test_login.py b/tests/test_login.py new file mode 100644 index 0000000..3199b76 --- /dev/null +++ b/tests/test_login.py @@ -0,0 +1,188 @@ +import asyncio +import unittest +from microdot import Microdot +from microdot.login import Login +from microdot.session import Session +from microdot.test_client import TestClient + + +class TestLogin(unittest.TestCase): + @classmethod + def setUpClass(cls): + if hasattr(asyncio, 'set_event_loop'): + asyncio.set_event_loop(asyncio.new_event_loop()) + cls.loop = asyncio.get_event_loop() + + def _run(self, coro): + return self.loop.run_until_complete(coro) + + def test_login(self): + app = Microdot() + Session(app, secret_key='secret') + login = Login() + + class User: + def __init__(self, id, name): + self.id = id + self.name = name + + @login.user_loader + def load_user(user_id): + return User(user_id, f'user{user_id}') + + @app.get('/') + @login + def index(request): + return request.g.current_user.name + + @app.post('/login') + async def login_route(request): + return await login.login_user(request, User(123, 'user123')) + + @app.post('/logout') + async def logout_route(request): + await login.logout_user(request) + return 'ok' + + client = TestClient(app) + res = self._run(client.get('/?foo=bar')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar') + + res = self._run(client.post('/login?next=/%3Ffoo=bar')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/?foo=bar') + self.assertEqual(len(res.headers['Set-Cookie']), 1) + self.assertIn('session', client.cookies) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'user123') + + res = self._run(client.post('/logout')) + self.assertEqual(res.status_code, 200) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 302) + + def test_login_bad_user_id(self): + class User: + def __init__(self, id, name): + self.id = id + self.name = name + + app = Microdot() + Session(app, secret_key='secret') + login = Login() + + @login.user_loader + def load_user(user_id): + return None + + @app.get('/foo') + @login + async def index(request): + return 'ok' + + @app.post('/login') + async def login_route(request): + return await login.login_user(request, User(1, 'user')) + + client = TestClient(app) + res = self._run(client.post('/login?next=/')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/') + res = self._run(client.get('/foo')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/login?next=/foo') + + def test_login_bad_redirect(self): + class User: + def __init__(self, id, name): + self.id = id + self.name = name + + app = Microdot() + Session(app, secret_key='secret') + login = Login() + + @login.user_loader + def load_user(user_id): + return user_id + + @app.get('/') + @login + async def index(request): + return 'ok' + + @app.post('/login') + async def login_route(request): + return await login.login_user(request, User(1, 'user')) + + client = TestClient(app) + res = self._run(client.post('/login?next=http://example.com')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/') + + def test_login_remember(self): + class User: + def __init__(self, id, name): + self.id = id + self.name = name + + app = Microdot() + Session(app, secret_key='secret') + login = Login() + + @login.user_loader + def load_user(user_id): + return User(user_id, f'user{user_id}') + + @app.get('/') + @login + def index(request): + return {'user': request.g.current_user.id} + + @app.post('/login') + async def login_route(request): + return await login.login_user(request, User(1, 'user1'), + remember=True) + + @app.post('/logout') + async def logout(request): + await login.logout_user(request) + return 'ok' + + @app.get('/fresh') + @login.fresh + async def fresh(request): + return f'fresh {request.g.current_user.id}' + + client = TestClient(app) + res = self._run(client.post('/login?next=/%3Ffoo=bar')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/?foo=bar') + self.assertEqual(len(res.headers['Set-Cookie']), 2) + self.assertIn('session', client.cookies) + self.assertIn('_remember', client.cookies) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, '{"user": 1}') + res = self._run(client.get('/fresh')) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.text, 'fresh 1') + + del client.cookies['session'] + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 200) + res = self._run(client.get('/fresh')) + self.assertEqual(res.status_code, 302) + self.assertEqual(res.headers['Location'], '/login?next=/fresh') + + res = self._run(client.post('/logout')) + self.assertEqual(res.status_code, 200) + self.assertFalse('_remember' in client.cookies) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 302)