9 Commits
main ... auth

Author SHA1 Message Date
Miguel Grinberg
e373d037f0 login tests 2024-04-28 00:32:22 +01:00
Miguel Grinberg
4321d93a82 auth documentation 2024-04-28 00:32:20 +01:00
Miguel Grinberg
68b6cb9862 Example updates 2024-04-28 00:32:06 +01:00
Miguel Grinberg
c7f9b3ff3b lint issues 2024-04-28 00:32:06 +01:00
Miguel Grinberg
261dd2f980 documentation 2024-04-28 00:32:06 +01:00
Miguel Grinberg
f204416e36 remember tests 2024-04-28 00:32:06 +01:00
Miguel Grinberg
7bc66ce3bb auth examples 2024-04-28 00:32:06 +01:00
Miguel Grinberg
43f2227140 remember cookie 2024-04-28 00:32:06 +01:00
Miguel Grinberg
b0cddde6ec Basic, token and login authentication 2024-04-28 00:32:06 +01:00
14 changed files with 930 additions and 13 deletions

View File

@@ -44,6 +44,22 @@ User Sessions
.. automodule:: microdot.session
:members:
Authentication
--------------
.. automodule:: microdot.auth
:inherited-members:
:special-members: __call__
:members:
User Logins
-----------
.. automodule:: microdot.login
:inherited-members:
:special-members: __call__
:members:
Cross-Origin Resource Sharing (CORS)
------------------------------------

View File

@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
described in this section are maintained as part of the Microdot project in
the same source code repository.
WebSocket Support
~~~~~~~~~~~~~~~~~
WebSocket
~~~~~~~~~
.. list-table::
:align: left
@@ -39,8 +39,8 @@ Example::
message = await ws.receive()
await ws.send(message)
Server-Sent Events Support
~~~~~~~~~~~~~~~~~~~~~~~~~~
Server-Sent Events
~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -78,8 +78,8 @@ Example::
the SSE object. For bidirectional communication with the client, use the
WebSocket extension.
Rendering Templates
~~~~~~~~~~~~~~~~~~~
Templates
~~~~~~~~~
Many web applications use HTML templates for rendering content to clients.
Microdot includes extensions to render templates with the
@@ -202,8 +202,8 @@ must be used.
.. note::
The Jinja extension is not compatible with MicroPython.
Maintaining Secure User Sessions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Secure User Sessions
~~~~~~~~~~~~~~~~~~~~
.. list-table::
:align: left
@@ -270,6 +270,117 @@ The :func:`save() <microdot.session.SessionDict.save>` and
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
and destroy the user session respectively.
Authentication
~~~~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
* - Required external dependencies
- | None
* - Examples
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
The authentication extension provides helper classes for two commonly used
authentication patterns, described below.
Basic Authentication
^^^^^^^^^^^^^^^^^^^^
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
is a method of authentication that is part of the HTTP specification. It allows
clients to authenticate to a server using a username and a password. Web
browsers have native support for Basic Authentication and will automatically
prompt the user for a username and a password when a protected resource is
accessed.
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
class::
from microdot.auth import BasicAuth
auth = BasicAuth(app)
Next, create an authentication function. The function must accept a request
object and a username and password pair provided by the user. If the
credentials are valid, the function must return an object that represents the
user. Decorate the function with ``@auth.authenticate``::
@auth.authenticate
async def verify_user(request, username, password):
user = await load_user_from_database(username)
if user and user.verify_password(password):
return user
If the authentication function cannot validate the user provided credentials it
must return ``None``.
To protect a route with authentication, add the ``auth`` instance as a
decorator::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
While running an authenticated request, the user object returned by the
authenticaction function is accessible as ``request.g.current_user``.
Token Authentication
^^^^^^^^^^^^^^^^^^^^
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
from microdot.auth import TokenAuth
auth = TokenAuth()
Then add a function that verifies the token and returns the user it belongs to,
or ``None`` if the token is invalid or expired::
@auth.authenticate
async def verify_token(request, token):
return load_user_from_token(token)
As with Basic authentication, the ``auth`` instance is used as a decorator to
protect your routes::
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
User Logins
~~~~~~~~~~~
.. list-table::
:align: left
* - Compatibility
- | CPython & MicroPython
* - Required Microdot source files
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
* - Required external dependencies
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
* - Examples
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
Cross-Origin Resource Sharing (CORS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -305,8 +416,8 @@ Example::
cors = CORS(app, allowed_origins=['https://example.com'],
allow_credentials=True)
Testing with the Test Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test Client
~~~~~~~~~~~
.. list-table::
:align: left
@@ -342,8 +453,8 @@ Example::
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
class for more details.
Deploying on a Production Web Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Production Deployments
~~~~~~~~~~~~~~~~~~~~~~
The ``Microdot`` class creates its own simple web server. This is enough for an
application deployed with MicroPython, but when using CPython it may be useful

1
examples/auth/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate basic and token authentication.

View File

@@ -0,0 +1,31 @@
from hashlib import sha1
from microdot import Microdot
from microdot.auth import BasicAuth
def create_hash(password):
return sha1(password).hexdigest()
USERS = {
'susan': create_hash(b'hello'),
'david': create_hash(b'bye'),
}
app = Microdot()
auth = BasicAuth()
@auth.authenticate
async def check_credentials(request, username, password):
if username in USERS and USERS[username] == create_hash(password.encode()):
return username
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,26 @@
from microdot import Microdot
from microdot.auth import TokenAuth
app = Microdot()
auth = TokenAuth()
TOKENS = {
'susan-token': 'susan',
'david-token': 'david',
}
@auth.authenticate
async def check_token(request, token):
if token in TOKENS:
return TOKENS[token]
@app.route('/')
@auth
async def index(request):
return f'Hello, {request.g.current_user}!'
if __name__ == '__main__':
app.run(debug=True)

1
examples/login/README.md Normal file
View File

@@ -0,0 +1 @@
This directory contains examples that demonstrate user logins.

122
examples/login/login.py Normal file
View File

@@ -0,0 +1,122 @@
from hashlib import sha1
from microdot import Microdot, redirect
from microdot.session import Session
from microdot.login import Login
class User:
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password_hash = self.create_hash(password)
def create_hash(self, password):
# note: to keep this example simple, passwords are hashed with the SHA1
# algorithm. In a real application, you should use a stronger
# algorithm, such as bcrypt.
return sha1(password.encode()).hexdigest()
def check_password(self, password):
return self.create_hash(password) == self.password_hash
USERS = {
'user001': User('user001', 'susan', 'hello'),
'user002': User('user002', 'david', 'bye'),
}
app = Microdot()
Session(app, secret_key='top-secret!')
auth = Login()
@auth.id_to_user
async def get_user(user_id):
print('get_user', user_id)
return USERS.get(user_id)
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
if request.method == 'GET':
return '''
<!doctype html>
<html>
<body>
<h1>Please Login</h1>
<form method="POST">
<p>
Username<br>
<input name="username" autofocus>
</p>
<p>
Password:<br>
<input name="password" type="password">
<br>
</p>
<p>
<input name="remember_me" type="checkbox"> Remember me
<br>
</p>
<p>
<button type="submit">Login</button>
</p>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))
for user in USERS.values():
if user.username == username:
if user.check_password(password):
return await auth.login_user(request, user,
remember=remember_me)
return redirect('/login')
@app.route('/')
@auth
async def index(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>
<a href="/fresh">Click here</a> to access the fresh login page.
</p>
<form method="POST" action="/logout">
<button type="submit">Logout</button>
</form>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.get('/fresh')
@auth.fresh
async def fresh(request):
return f'''
<!doctype html>
<html>
<body>
<h1>Hello, {request.g.current_user.username}!</h1>
<p>This page requires a fresh login session.</p>
<p><a href="/">Go back</a> to the main page.</p>
</body>
</html>
''', {'Content-Type': 'text/html'}
@app.post('/logout')
@auth
async def logout(request):
await auth.logout_user(request)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,3 +1,7 @@
# This is a simple example that demonstrates how to use the user session, but
# is not intended as a complete login solution. See the login subdirectory for
# a more complete example.
from microdot import Microdot, Response, redirect
from microdot.session import Session, with_session

144
src/microdot/auth.py Normal file
View File

@@ -0,0 +1,144 @@
from microdot import abort
from microdot.microdot import invoke_handler
class BaseAuth:
def __init__(self):
self.auth_callback = None
self.error_callback = None
def __call__(self, f):
"""Decorator to protect a route with authentication.
An instance of this class must be used as a decorator on the routes
that need to be protected. Example::
auth = BasicAuth() # or TokenAuth()
@app.route('/protected')
@auth
def protected(request):
# ...
Routes that are decorated in this way will only be invoked if the
authentication callback returned a valid user object, otherwise the
error callback will be executed.
"""
async def wrapper(request, *args, **kwargs):
auth = self._get_auth(request)
if not auth:
return await invoke_handler(self.error_callback, request)
request.g.current_user = await invoke_handler(
self.auth_callback, request, *auth)
if not request.g.current_user:
return await invoke_handler(self.error_callback, request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
class BasicAuth(BaseAuth):
"""Basic Authentication.
:param realm: The realm that is displayed when the user is prompted to
authenticate in the browser.
:param charset: The charset that is used to encode the realm.
:param scheme: The authentication scheme. Defaults to 'Basic'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
error_status=401):
super().__init__()
self.realm = realm
self.charset = charset
self.scheme = scheme
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get('Authorization')
if auth and auth.startswith('Basic '):
import binascii
try:
username, password = binascii.a2b_base64(
auth[6:]).decode().split(':', 1)
except Exception: # pragma: no cover
return None
return username, password
def authentication_error(self, request):
return '', self.error_status, {
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
self.scheme, self.realm, self.charset)}
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, username, password):
user = get_user(username)
if user and user.check_password(password):
return get_user(username)
"""
self.auth_callback = f
class TokenAuth(BaseAuth):
"""Token based authentication.
:param header: The name of the header that will contain the token. Defaults
to 'Authorization'.
:param scheme: The authentication scheme. Defaults to 'Bearer'.
:param error_status: The error status code to return when authentication
fails. Defaults to 401.
"""
def __init__(self, header='Authorization', scheme='Bearer',
error_status=401):
super().__init__()
self.header = header
self.scheme = scheme.lower()
self.error_status = error_status
self.error_callback = self.authentication_error
def _get_auth(self, request):
auth = request.headers.get(self.header)
if auth:
if self.header == 'Authorization':
try:
scheme, token = auth.split(' ', 1)
except Exception:
return None
if scheme.lower() == self.scheme:
return (token.strip(),)
else:
return (auth,)
def authenticate(self, f):
"""Decorator to configure the authentication callback.
This decorator must be used with a function that accepts the request
object, a username and a password and returns a user object if the
credentials are valid, or ``None`` if they are not. Example::
@auth.authenticate
async def check_credentials(request, token):
return get_user(token)
"""
self.auth_callback = f
def errorhandler(self, f):
"""Decorator to configure the error callback.
Microdot calls the error callback to allow the application to generate
a custom error response. The default error response is to call
``abort(401)``.
"""
self.error_callback = f
def authentication_error(self, request):
abort(self.error_status)

150
src/microdot/login.py Normal file
View File

@@ -0,0 +1,150 @@
from time import time
from microdot import redirect
from microdot.microdot import urlencode, invoke_handler
class Login:
def __init__(self, login_url='/login'):
self.login_url = login_url
self.user_callback = None
self.user_id_callback = lambda user: user.id
def id_to_user(self, f):
"""Decorator to configure the user callback.
The decorated function receives the user ID as an argument and must
return the corresponding user object, or ``None`` if the user ID is
invalid.
"""
self.user_callback = f
def user_to_id(self, f):
"""Decorator to configure the user ID callback.
The decorated functon receives the user object as an argument and must
return the corresponding user ID. By default, the ``id`` attribute of
the user object is used.
"""
self.user_id_callback = f
def _get_session(self, request):
return request.app._session.get(request)
def _update_remember_cookie(self, request, days, user_id=None):
remember_payload = request.app._session.encode({
'user_id': user_id,
'days': days,
'exp': time() + days * 24 * 60 * 60
})
@request.after_request
async def _set_remember_cookie(request, response):
response.set_cookie('_remember', remember_payload,
max_age=days * 24 * 60 * 60)
return response
def _get_user_id_from_session(self, request):
session = self._get_session(request)
if session and '_user_id' in session:
return session['_user_id']
if '_remember' in request.cookies:
remember_payload = request.app._session.decode(
request.cookies['_remember'])
user_id = remember_payload.get('user_id')
if user_id: # pragma: no branch
self._update_remember_cookie(
request, remember_payload.get('_days', 30), user_id)
session['_user_id'] = user_id
session['_fresh'] = False
session.save()
return user_id
async def _redirect_to_login(self, request):
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
request.url)}
async def login_user(self, request, user, remember=False,
redirect_url='/'):
"""Log a user in.
:param request: the request object
:param user: the user object
:param remember: if the user's logged in state should be remembered
with a cookie after the session ends. Set to the
number of days the remember cookie should last, or to
``True`` to use a default duration of 30 days.
:param redirect_url: the URL to redirect to after login
This call marks the user as logged in by storing their user ID in the
user session. The application must call this method to log a user in
after their credentials have been validated.
The method returns a redirect response, either to the URL the user
originally intended to visit, or if there is no original URL to the URL
specified by the `redirect_url`.
"""
session = self._get_session(request)
session['_user_id'] = await invoke_handler(self.user_id_callback, user)
session['_fresh'] = True
session.save()
if remember:
days = 30 if remember is True else int(remember)
self._update_remember_cookie(request, days, session['_user_id'])
next_url = request.args.get('next', redirect_url)
if not next_url.startswith('/'):
next_url = redirect_url
return redirect(next_url)
async def logout_user(self, request):
"""Log a user out.
:param request: the request object
This call removes information about the user's log in from the user
session. If a remember cookie exists, it is removed as well.
"""
session = self._get_session(request)
session.pop('_user_id', None)
session.pop('_fresh', None)
session.save()
if '_remember' in request.cookies:
self._update_remember_cookie(request, 0)
def __call__(self, f):
"""Decorator to protect a route with authentication.
If the user is not logged in, Microdot will redirect to the login page
first. The decorated route will only run after successful login by the
user. If the user is already logged in, the route will run immediately.
"""
async def wrapper(request, *args, **kwargs):
user_id = self._get_user_id_from_session(request)
if not user_id:
return await self._redirect_to_login(request)
request.g.current_user = await invoke_handler(
self.user_callback, user_id)
if not request.g.current_user:
return await self._redirect_to_login(request)
return await invoke_handler(f, request, *args, **kwargs)
return wrapper
def fresh(self, f):
"""Decorator to protect a route with "fresh" authentication.
This decorator prevents the route from running when the login session
is not fresh. A fresh session is a session that has been created from
direct user interaction with the login page, as opposite to a session
that was restored from a "remember me" cookie.
"""
base_wrapper = self.__call__(f)
async def wrapper(request, *args, **kwargs):
session = self._get_session(request)
if session.get('_fresh'):
return await base_wrapper(request, *args, **kwargs)
return await self._redirect_to_login(request)
return wrapper

View File

@@ -9,3 +9,4 @@ from tests.test_sse import * # noqa: F401, F403
from tests.test_cors import * # noqa: F401, F403
from tests.test_utemplate import * # noqa: F401, F403
from tests.test_session import * # noqa: F401, F403
from tests.test_auth import * # noqa: F401, F403

125
tests/test_auth.py Normal file
View File

@@ -0,0 +1,125 @@
import asyncio
import binascii
import unittest
from microdot import Microdot
from microdot.auth import BasicAuth, TokenAuth
from microdot.test_client import TestClient
class TestAuth(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_basic_auth(self):
app = Microdot()
basic_auth = BasicAuth()
@basic_auth.authenticate
def authenticate(request, username, password):
if username == 'foo' and password == 'bar':
return {'username': username}
@app.route('/')
@basic_auth
def index(request):
return request.g.current_user['username']
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:bar').decode()}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'foo')
res = self._run(client.get('/', headers={
'Authorization': 'Basic ' + binascii.b2a_base64(
b'foo:baz').decode()}))
self.assertEqual(res.status_code, 401)
def test_token_auth(self):
app = Microdot()
token_auth = TokenAuth()
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
def test_token_auth_custom_header(self):
app = Microdot()
token_auth = TokenAuth(header='X-Auth-Token')
@token_auth.authenticate
def authenticate(request, token):
if token == 'foo':
return 'user'
@app.route('/')
@token_auth
def index(request):
return request.g.current_user
client = TestClient(app)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Basic foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'Authorization': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={
'X-Token-Auth': 'Bearer foo'}))
self.assertEqual(res.status_code, 401)
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
@token_auth.errorhandler
def error_handler(request):
return {'status_code': 403}, 403
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 403)
self.assertEqual(res.json, {'status_code': 403})

185
tests/test_login.py Normal file
View File

@@ -0,0 +1,185 @@
import asyncio
import unittest
from microdot import Microdot
from microdot.login import Login
from microdot.session import Session
from microdot.test_client import TestClient
class TestLogin(unittest.TestCase):
@classmethod
def setUpClass(cls):
if hasattr(asyncio, 'set_event_loop'):
asyncio.set_event_loop(asyncio.new_event_loop())
cls.loop = asyncio.get_event_loop()
def _run(self, coro):
return self.loop.run_until_complete(coro)
def test_login(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
class User:
def __init__(self, id, name):
self.id = id
self.name = name
@login.id_to_user
def id_to_user(user_id):
return User(user_id, f'user{user_id}')
@app.get('/')
@login
def index(request):
return request.g.current_user.name
@app.post('/login')
async def login_route(request):
return await login.login_user(request, User(123, 'user123'))
@app.post('/logout')
async def logout_route(request):
await login.logout_user(request)
return 'ok'
client = TestClient(app)
res = self._run(client.get('/?foo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 1)
self.assertIn('session', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user123')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)
def test_login_bad_user_id(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return None
@login.user_to_id
def user_to_id(user):
return user
@app.get('/foo')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user')
client = TestClient(app)
res = self._run(client.post('/login?next=/'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
res = self._run(client.get('/foo'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/foo')
def test_login_bad_redirect(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return user_id
@login.user_to_id
def user_to_id(user):
return user
@app.get('/')
@login
async def index(request):
return 'ok'
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user')
client = TestClient(app)
res = self._run(client.post('/login?next=http://example.com'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/')
def test_login_remember(self):
app = Microdot()
Session(app, secret_key='secret')
login = Login()
@login.id_to_user
def id_to_user(user_id):
return user_id
@login.user_to_id
def user_to_id(user):
return user
@app.get('/')
@login
def index(request):
return request.g.current_user
@app.post('/login')
async def login_route(request):
return await login.login_user(request, 'user', remember=True)
@app.post('/logout')
async def logout(request):
await login.logout_user(request)
return 'ok'
@app.get('/fresh')
@login.fresh
async def fresh(request):
return f'fresh {request.g.current_user}'
client = TestClient(app)
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/?foo=bar')
self.assertEqual(len(res.headers['Set-Cookie']), 2)
self.assertIn('session', client.cookies)
self.assertIn('_remember', client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'user')
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 200)
self.assertEqual(res.text, 'fresh user')
del client.cookies['session']
print(client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 200)
res = self._run(client.get('/fresh'))
self.assertEqual(res.status_code, 302)
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
res = self._run(client.post('/logout'))
self.assertEqual(res.status_code, 200)
self.assertFalse('_remember' in client.cookies)
res = self._run(client.get('/'))
self.assertEqual(res.status_code, 302)

View File

@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
@app.post('/set')
@with_session
async def save_session(req, session):
def save_session(req, session):
session['name'] = 'joe'
session.save()
return 'OK'