user logins

This commit is contained in:
Miguel Grinberg
2025-02-04 00:04:55 +00:00
parent 675c978797
commit d807011ad0
9 changed files with 621 additions and 0 deletions

View File

@@ -52,6 +52,14 @@ Authentication
:special-members: __call__
:members:
User Logins
-----------
.. automodule:: microdot.login
:inherited-members:
:special-members: __call__
:members:
Cross-Origin Resource Sharing (CORS)
------------------------------------

View File

@@ -288,6 +288,7 @@ Authentication
* - Examples
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
The authentication extension provides helper classes for two commonly used
authentication patterns, described below.
@@ -355,6 +356,91 @@ protect your routes::
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
User Logins
~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
* - Required external dependencies
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
* - Examples
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
The login extension provides user login functionality. The logged in state of
the user is stored in the user session cookie, and an optional "remember me"
cookie can also be added to keep the user logged in across browser sessions.
To use this extension, create instances of the
:class:`Session <microdot.session.Session>` and :class:`Login <microdot.login.Login>`
class::
Session(app, secret_key='top-secret!')
login = Login()
The ``Login`` class accept an optional argument with the URL of the login page.
The default for this URL is */login*.
The application must represent users as objects with an ``id`` attribute. A
function decorated with ``@login.user_loader`` is used to load a user object::
@login.user_loader
async def get_user(user_id):
return database.get_user(user_id)
The application must implement the login form. At the point in which the user
credentials have been received and verified, a call to the
:func:`login_user() <microdot.login.Login.login_user>` function must be made to
record the user in the user session::
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
# ...
if user.check_password(password):
return await login.login_user(request, user, remember=remember_me)
return redirect('/login')
The optional ``remember`` argument is used to add a remember me cookie that
will log the user in automatically in future sessions. A value of ``True`` will
keep the log in active for 30 days. Alternatively, an integer number of days
can be passed in this argument.
Any routes that require the user to be logged in must be decorated with
:func:`@login <microdot.login.Login.__call__>`::
@app.route('/')
@login
async def index(request):
# ...
Routes that are of a sensitive nature can be decorated with
:func:`@login.fresh <microdot.login.Login.fresh>`
instead. This decorator requires that the user has logged in during the current
session, and will ask the user to logged in again if the session was
authenticated through a remember me cookie::
@app.get('/fresh')
@login.fresh
async def fresh(request):
# ...
To log out a user, the :func:`logout_user() <microdot.auth.Login.logout_user>`
is used::
@app.post('/logout')
@login
async def logout(request):
await login.logout_user(request)
return redirect('/')
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

1
examples/login/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate user logins.

123
examples/login/login.py Normal file
View File

@@ -0,0 +1,123 @@
from microdot import Microdot, redirect
from microdot.session import Session
from microdot.login import Login
from pbkdf2 import generate_password_hash, check_password_hash
# this example provides an implementation of the generate_password_hash and
# check_password_hash functions that can be used in MicroPython. On CPython
# there are many other options for password hashisng so there is no need to use
# this custom solution.
class User:
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password_hash = self.create_hash(password)
def create_hash(self, password):
return generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
USERS = {
'user001': User('user001', 'susan', 'hello'),
'user002': User('user002', 'david', 'bye'),
}
app = Microdot()
Session(app, secret_key='top-secret!')
login = Login()
@login.user_loader
async def get_user(user_id):
return USERS.get(user_id)
@app.route('/login', methods=['GET', 'POST'])
async def login_page(request):
if request.method == 'GET':
return '''
<!doctype html>
<html>
<body>
<h1>Please Login</h1>
<form method="POST">
<p>
Username<br>
<input name="username" autofocus>
</p>
<p>
Password:<br>
<input name="password" type="password">
<br>
</p>
<p>
<input name="remember_me" type="checkbox"> Remember me
<br>
</p>
<p>
<button type="submit">Login</button>
</p>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))
for user in USERS.values():
if user.username == username:
if user.check_password(password):
return await login.login_user(request, user,
remember=remember_me)
return redirect('/login')
@app.route('/')
@login
async def index(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>
<a href="/fresh">Click here</a> to access the fresh login page.
</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>This page requires a fresh login session.</p>
<p><a href="/">Go back</a> to the main page.</p>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.post('/logout')
@login
async def logout(request):
await login.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

47
examples/login/pbkdf2.py Normal file
View File

@@ -0,0 +1,47 @@
import os
import hashlib
# PBKDF2 secure password hashing algorithm obtained from:
# https://codeandlife.com/2023/01/06/how-to-calculate-pbkdf2-hmac-sha256-with-
# python,-example-code/
def sha256(b):
return hashlib.sha256(b).digest()
def ljust(b, n, f):
return b + f * (n - len(b))
def gethmac(key, content):
okeypad = bytes(v ^ 0x5c for v in ljust(key, 64, b'\0'))
ikeypad = bytes(v ^ 0x36 for v in ljust(key, 64, b'\0'))
return sha256(okeypad + sha256(ikeypad + content))
def pbkdf2(pwd, salt, iterations=1000):
U = salt + b'\x00\x00\x00\x01'
T = bytes(64)
for _ in range(iterations):
U = gethmac(pwd, U)
T = bytes(a ^ b for a, b in zip(U, T))
return T
# The number of iterations may need to be adjusted depending on the hardware.
# Lower numbers make the password hashing algorithm faster but less secure, so
# the largest number that can be tolerated should be used.
def generate_password_hash(password, salt=None, iterations=100000):
salt = salt or os.urandom(16)
dk = pbkdf2(password.encode(), salt, iterations)
return f'pbkdf2-hmac-sha256:{salt.hex()}:{iterations}:{dk.hex()}'
def check_password_hash(password_hash, password):
algorithm, salt, iterations, dk = password_hash.split(':')
iterations = int(iterations)
if algorithm != 'pbkdf2-hmac-sha256':
return False
return pbkdf2(password.encode(), salt=bytes.fromhex(salt),
iterations=iterations) == bytes.fromhex(dk)

View File

@@ -1,3 +1,6 @@
# This is a simple example that demonstrates how to use the user session, but
# is not intended as a complete login solution. See the login subdirectory for
# a more complete example.
from microdot import Microdot, Response, redirect
from microdot.session import Session, with_session

163
src/microdot/login.py Normal file
View File

@@ -0,0 +1,163 @@
from time import time
from microdot import redirect
from microdot.microdot import urlencode, invoke_handler
class Login:
"""User login support for Microdot.
:param login_url: the URL to redirect to when a login is required. The
default is '/login'.
"""
def __init__(self, login_url='/login'):
self.login_url = login_url
self.user_loader_callback = None
def user_loader(self, f):
"""Decorator to configure the user callback.
The decorated function receives the user ID as an argument and must
return the corresponding user object, or ``None`` if the user ID is
invalid.
"""
self.user_loader_callback = f
def _get_session(self, request):
return request.app._session.get(request)
def _update_remember_cookie(self, request, days, user_id=None):
remember_payload = request.app._session.encode({
'user_id': user_id,
'days': days,
'exp': time() + days * 24 * 60 * 60
})
@request.after_request
async def _set_remember_cookie(request, response):
response.set_cookie('_remember', remember_payload,
max_age=days * 24 * 60 * 60)
return response
def _get_user_id_from_session(self, request):
session = self._get_session(request)
if session and '_user_id' in session:
return session['_user_id']
if '_remember' in request.cookies:
remember_payload = request.app._session.decode(
request.cookies['_remember'])
user_id = remember_payload.get('user_id')
if user_id: # pragma: no branch
self._update_remember_cookie(
request, remember_payload.get('_days', 30), user_id)
session['_user_id'] = user_id
session['_fresh'] = False
session.save()
return user_id
async def _redirect_to_login(self, request):
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
request.url)}
async def login_user(self, request, user, remember=False,
redirect_url='/'):
"""Log a user in.
:param request: the request object
:param user: the user object
:param remember: if the user's logged in state should be remembered
with a cookie after the session ends. Set to the
number of days the remember cookie should last, or to
``True`` to use a default duration of 30 days.
:param redirect_url: the URL to redirect to after login
This call marks the user as logged in by storing their user ID in the
user session. The application must call this method to log a user in
after their credentials have been validated.
The method returns a redirect response, either to the URL the user
originally intended to visit, or if there is no original URL to the URL
specified by the `redirect_url`.
"""
session = self._get_session(request)
session['_user_id'] = user.id
session['_fresh'] = True
session.save()
if remember:
days = 30 if remember is True else int(remember)
self._update_remember_cookie(request, days, session['_user_id'])
next_url = request.args.get('next', redirect_url)
if not next_url.startswith('/'):
next_url = redirect_url
return redirect(next_url)
async def logout_user(self, request):
"""Log a user out.
:param request: the request object
This call removes information about the user's log in from the user
session. If a remember cookie exists, it is removed as well.
"""
session = self._get_session(request)
session.pop('_user_id', None)
session.pop('_fresh', None)
session.save()
if '_remember' in request.cookies:
self._update_remember_cookie(request, 0)
def __call__(self, f):
"""Decorator to protect a route with authentication.
If the user is not logged in, Microdot will redirect to the login page
first. The decorated route will only run after successful login by the
user. If the user is already logged in, the route will run immediately.
Example::
login = Login()
@app.route('/secret')
@login
async def secret(request):
# only accessible to authenticated users
"""
async def wrapper(request, *args, **kwargs):
user_id = self._get_user_id_from_session(request)
if not user_id:
return await self._redirect_to_login(request)
request.g.current_user = await invoke_handler(
self.user_loader_callback, user_id)
if not request.g.current_user:
return await self._redirect_to_login(request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
def fresh(self, f):
"""Decorator to protect a route with "fresh" authentication.
This decorator prevents the route from running when the login session
is not fresh. A fresh session is a session that has been created from
direct user interaction with the login page, while a non-fresh session
occurs when a login is restored from a "remember me" cookie. Example::
login = Login()
@app.route('/secret')
@auth.fresh
async def secret(request):
# only accessible to authenticated users
# users logged in via remember me cookie will need to
# re-authenticate
"""
base_wrapper = self.__call__(f)
async def wrapper(request, *args, **kwargs):
session = self._get_session(request)
if session.get('_fresh'):
return await base_wrapper(request, *args, **kwargs)
return await self._redirect_to_login(request)
return wrapper

View File

@@ -9,3 +9,5 @@ from tests.test_sse import * # noqa: F401, F403
from tests.test_cors import * # noqa: F401, F403
from tests.test_utemplate import * # noqa: F401, F403
from tests.test_session import * # noqa: F401, F403
from tests.test_auth import * # noqa: F401, F403
from tests.test_login import * # noqa: F401, F403

188
tests/test_login.py Normal file
View File

@@ -0,0 +1,188 @@
import asyncio
import unittest
from microdot import Microdot
from microdot.login import Login
from microdot.session import Session
from microdot.test_client import TestClient
class TestLogin(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_login(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
class User:
def __init__(self, id, name):
self.id = id
self.name = name
@login.user_loader
def load_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return request.g.current_user.name
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(123, 'user123'))
@app.post('/logout')
async def logout_route(request):
await login.logout_user(request)
return 'ok'
client = TestClient(app)
res = self._run(client.get('/?foo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 1)
self.assertIn('session', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user123')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)
def test_login_bad_user_id(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return None
@app.get('/foo')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user'))
client = TestClient(app)
res = self._run(client.post('/login?next=/'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/foo')
def test_login_bad_redirect(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return user_id
@app.get('/')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user'))
client = TestClient(app)
res = self._run(client.post('/login?next=http://example.com'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
def test_login_remember(self):
class User:
def __init__(self, id, name):
self.id = id
self.name = name
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.user_loader
def load_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return {'user': request.g.current_user.id}
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(1, 'user1'),
remember=True)
@app.post('/logout')
async def logout(request):
await login.logout_user(request)
return 'ok'
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'fresh {request.g.current_user.id}'
client = TestClient(app)
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 2)
self.assertIn('session', client.cookies)
self.assertIn('_remember', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, '{"user": 1}')
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'fresh 1')
del client.cookies['session']
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
self.assertFalse('_remember' in client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)