Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e373d037f0 | ||
|
|
4321d93a82 | ||
|
|
68b6cb9862 | ||
|
|
c7f9b3ff3b | ||
|
|
261dd2f980 | ||
|
|
f204416e36 | ||
|
|
7bc66ce3bb | ||
|
|
43f2227140 | ||
|
|
b0cddde6ec |
16
docs/api.rst
16
docs/api.rst
@@ -44,6 +44,22 @@ User Sessions
|
||||
.. automodule:: microdot.session
|
||||
:members:
|
||||
|
||||
Authentication
|
||||
--------------
|
||||
|
||||
.. automodule:: microdot.auth
|
||||
:inherited-members:
|
||||
:special-members: __call__
|
||||
:members:
|
||||
|
||||
User Logins
|
||||
-----------
|
||||
|
||||
.. automodule:: microdot.login
|
||||
:inherited-members:
|
||||
:special-members: __call__
|
||||
:members:
|
||||
|
||||
Cross-Origin Resource Sharing (CORS)
|
||||
------------------------------------
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
|
||||
described in this section are maintained as part of the Microdot project in
|
||||
the same source code repository.
|
||||
|
||||
WebSocket Support
|
||||
~~~~~~~~~~~~~~~~~
|
||||
WebSocket
|
||||
~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -39,8 +39,8 @@ Example::
|
||||
message = await ws.receive()
|
||||
await ws.send(message)
|
||||
|
||||
Server-Sent Events Support
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Server-Sent Events
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -78,8 +78,8 @@ Example::
|
||||
the SSE object. For bidirectional communication with the client, use the
|
||||
WebSocket extension.
|
||||
|
||||
Rendering Templates
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
Templates
|
||||
~~~~~~~~~
|
||||
|
||||
Many web applications use HTML templates for rendering content to clients.
|
||||
Microdot includes extensions to render templates with the
|
||||
@@ -202,8 +202,8 @@ must be used.
|
||||
.. note::
|
||||
The Jinja extension is not compatible with MicroPython.
|
||||
|
||||
Maintaining Secure User Sessions
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Secure User Sessions
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -270,6 +270,117 @@ The :func:`save() <microdot.session.SessionDict.save>` and
|
||||
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
|
||||
and destroy the user session respectively.
|
||||
|
||||
Authentication
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
|
||||
* - Compatibility
|
||||
- | CPython & MicroPython
|
||||
|
||||
* - Required Microdot source files
|
||||
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||
|
||||
* - Required external dependencies
|
||||
- | None
|
||||
|
||||
* - Examples
|
||||
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
|
||||
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
|
||||
|
||||
The authentication extension provides helper classes for two commonly used
|
||||
authentication patterns, described below.
|
||||
|
||||
Basic Authentication
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
|
||||
is a method of authentication that is part of the HTTP specification. It allows
|
||||
clients to authenticate to a server using a username and a password. Web
|
||||
browsers have native support for Basic Authentication and will automatically
|
||||
prompt the user for a username and a password when a protected resource is
|
||||
accessed.
|
||||
|
||||
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
|
||||
class::
|
||||
|
||||
from microdot.auth import BasicAuth
|
||||
|
||||
auth = BasicAuth(app)
|
||||
|
||||
Next, create an authentication function. The function must accept a request
|
||||
object and a username and password pair provided by the user. If the
|
||||
credentials are valid, the function must return an object that represents the
|
||||
user. Decorate the function with ``@auth.authenticate``::
|
||||
|
||||
@auth.authenticate
|
||||
async def verify_user(request, username, password):
|
||||
user = await load_user_from_database(username)
|
||||
if user and user.verify_password(password):
|
||||
return user
|
||||
|
||||
If the authentication function cannot validate the user provided credentials it
|
||||
must return ``None``.
|
||||
|
||||
To protect a route with authentication, add the ``auth`` instance as a
|
||||
decorator::
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
While running an authenticated request, the user object returned by the
|
||||
authenticaction function is accessible as ``request.g.current_user``.
|
||||
|
||||
Token Authentication
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
|
||||
|
||||
from microdot.auth import TokenAuth
|
||||
|
||||
auth = TokenAuth()
|
||||
|
||||
Then add a function that verifies the token and returns the user it belongs to,
|
||||
or ``None`` if the token is invalid or expired::
|
||||
|
||||
@auth.authenticate
|
||||
async def verify_token(request, token):
|
||||
return load_user_from_token(token)
|
||||
|
||||
As with Basic authentication, the ``auth`` instance is used as a decorator to
|
||||
protect your routes::
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
User Logins
|
||||
~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
|
||||
* - Compatibility
|
||||
- | CPython & MicroPython
|
||||
|
||||
* - Required Microdot source files
|
||||
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
|
||||
|
||||
* - Required external dependencies
|
||||
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
|
||||
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
|
||||
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
|
||||
|
||||
* - Examples
|
||||
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
|
||||
|
||||
|
||||
|
||||
Cross-Origin Resource Sharing (CORS)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -305,8 +416,8 @@ Example::
|
||||
cors = CORS(app, allowed_origins=['https://example.com'],
|
||||
allow_credentials=True)
|
||||
|
||||
Testing with the Test Client
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Test Client
|
||||
~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -342,8 +453,8 @@ Example::
|
||||
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
|
||||
class for more details.
|
||||
|
||||
Deploying on a Production Web Server
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Production Deployments
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ``Microdot`` class creates its own simple web server. This is enough for an
|
||||
application deployed with MicroPython, but when using CPython it may be useful
|
||||
|
||||
1
examples/auth/README.md
Normal file
1
examples/auth/README.md
Normal file
@@ -0,0 +1 @@
|
||||
This directory contains examples that demonstrate basic and token authentication.
|
||||
31
examples/auth/basic_auth.py
Normal file
31
examples/auth/basic_auth.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from hashlib import sha1
|
||||
from microdot import Microdot
|
||||
from microdot.auth import BasicAuth
|
||||
|
||||
|
||||
def create_hash(password):
|
||||
return sha1(password).hexdigest()
|
||||
|
||||
|
||||
USERS = {
|
||||
'susan': create_hash(b'hello'),
|
||||
'david': create_hash(b'bye'),
|
||||
}
|
||||
app = Microdot()
|
||||
auth = BasicAuth()
|
||||
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, username, password):
|
||||
if username in USERS and USERS[username] == create_hash(password.encode()):
|
||||
return username
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
26
examples/auth/token_auth.py
Normal file
26
examples/auth/token_auth.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from microdot import Microdot
|
||||
from microdot.auth import TokenAuth
|
||||
|
||||
app = Microdot()
|
||||
auth = TokenAuth()
|
||||
|
||||
TOKENS = {
|
||||
'susan-token': 'susan',
|
||||
'david-token': 'david',
|
||||
}
|
||||
|
||||
|
||||
@auth.authenticate
|
||||
async def check_token(request, token):
|
||||
if token in TOKENS:
|
||||
return TOKENS[token]
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
1
examples/login/README.md
Normal file
1
examples/login/README.md
Normal file
@@ -0,0 +1 @@
|
||||
This directory contains examples that demonstrate user logins.
|
||||
122
examples/login/login.py
Normal file
122
examples/login/login.py
Normal file
@@ -0,0 +1,122 @@
|
||||
from hashlib import sha1
|
||||
from microdot import Microdot, redirect
|
||||
from microdot.session import Session
|
||||
from microdot.login import Login
|
||||
|
||||
|
||||
class User:
|
||||
def __init__(self, id, username, password):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.password_hash = self.create_hash(password)
|
||||
|
||||
def create_hash(self, password):
|
||||
# note: to keep this example simple, passwords are hashed with the SHA1
|
||||
# algorithm. In a real application, you should use a stronger
|
||||
# algorithm, such as bcrypt.
|
||||
return sha1(password.encode()).hexdigest()
|
||||
|
||||
def check_password(self, password):
|
||||
return self.create_hash(password) == self.password_hash
|
||||
|
||||
|
||||
USERS = {
|
||||
'user001': User('user001', 'susan', 'hello'),
|
||||
'user002': User('user002', 'david', 'bye'),
|
||||
}
|
||||
|
||||
app = Microdot()
|
||||
Session(app, secret_key='top-secret!')
|
||||
auth = Login()
|
||||
|
||||
|
||||
@auth.id_to_user
|
||||
async def get_user(user_id):
|
||||
print('get_user', user_id)
|
||||
return USERS.get(user_id)
|
||||
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
async def login(request):
|
||||
if request.method == 'GET':
|
||||
return '''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Please Login</h1>
|
||||
<form method="POST">
|
||||
<p>
|
||||
Username<br>
|
||||
<input name="username" autofocus>
|
||||
</p>
|
||||
<p>
|
||||
Password:<br>
|
||||
<input name="password" type="password">
|
||||
<br>
|
||||
</p>
|
||||
<p>
|
||||
<input name="remember_me" type="checkbox"> Remember me
|
||||
<br>
|
||||
</p>
|
||||
<p>
|
||||
<button type="submit">Login</button>
|
||||
</p>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
remember_me = bool(request.form.get('remember_me'))
|
||||
|
||||
for user in USERS.values():
|
||||
if user.username == username:
|
||||
if user.check_password(password):
|
||||
return await auth.login_user(request, user,
|
||||
remember=remember_me)
|
||||
return redirect('/login')
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||
<p>
|
||||
<a href="/fresh">Click here</a> to access the fresh login page.
|
||||
</p>
|
||||
<form method="POST" action="/logout">
|
||||
<button type="submit">Logout</button>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
|
||||
|
||||
@app.get('/fresh')
|
||||
@auth.fresh
|
||||
async def fresh(request):
|
||||
return f'''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||
<p>This page requires a fresh login session.</p>
|
||||
<p><a href="/">Go back</a> to the main page.</p>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
|
||||
|
||||
@app.post('/logout')
|
||||
@auth
|
||||
async def logout(request):
|
||||
await auth.logout_user(request)
|
||||
return redirect('/')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
@@ -1,3 +1,7 @@
|
||||
# This is a simple example that demonstrates how to use the user session, but
|
||||
# is not intended as a complete login solution. See the login subdirectory for
|
||||
# a more complete example.
|
||||
|
||||
from microdot import Microdot, Response, redirect
|
||||
from microdot.session import Session, with_session
|
||||
|
||||
|
||||
144
src/microdot/auth.py
Normal file
144
src/microdot/auth.py
Normal file
@@ -0,0 +1,144 @@
|
||||
from microdot import abort
|
||||
from microdot.microdot import invoke_handler
|
||||
|
||||
|
||||
class BaseAuth:
|
||||
def __init__(self):
|
||||
self.auth_callback = None
|
||||
self.error_callback = None
|
||||
|
||||
def __call__(self, f):
|
||||
"""Decorator to protect a route with authentication.
|
||||
|
||||
An instance of this class must be used as a decorator on the routes
|
||||
that need to be protected. Example::
|
||||
|
||||
auth = BasicAuth() # or TokenAuth()
|
||||
|
||||
@app.route('/protected')
|
||||
@auth
|
||||
def protected(request):
|
||||
# ...
|
||||
|
||||
Routes that are decorated in this way will only be invoked if the
|
||||
authentication callback returned a valid user object, otherwise the
|
||||
error callback will be executed.
|
||||
"""
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
auth = self._get_auth(request)
|
||||
if not auth:
|
||||
return await invoke_handler(self.error_callback, request)
|
||||
request.g.current_user = await invoke_handler(
|
||||
self.auth_callback, request, *auth)
|
||||
if not request.g.current_user:
|
||||
return await invoke_handler(self.error_callback, request)
|
||||
return await invoke_handler(f, request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class BasicAuth(BaseAuth):
|
||||
"""Basic Authentication.
|
||||
|
||||
:param realm: The realm that is displayed when the user is prompted to
|
||||
authenticate in the browser.
|
||||
:param charset: The charset that is used to encode the realm.
|
||||
:param scheme: The authentication scheme. Defaults to 'Basic'.
|
||||
:param error_status: The error status code to return when authentication
|
||||
fails. Defaults to 401.
|
||||
"""
|
||||
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
|
||||
error_status=401):
|
||||
super().__init__()
|
||||
self.realm = realm
|
||||
self.charset = charset
|
||||
self.scheme = scheme
|
||||
self.error_status = error_status
|
||||
self.error_callback = self.authentication_error
|
||||
|
||||
def _get_auth(self, request):
|
||||
auth = request.headers.get('Authorization')
|
||||
if auth and auth.startswith('Basic '):
|
||||
import binascii
|
||||
try:
|
||||
username, password = binascii.a2b_base64(
|
||||
auth[6:]).decode().split(':', 1)
|
||||
except Exception: # pragma: no cover
|
||||
return None
|
||||
return username, password
|
||||
|
||||
def authentication_error(self, request):
|
||||
return '', self.error_status, {
|
||||
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
|
||||
self.scheme, self.realm, self.charset)}
|
||||
|
||||
def authenticate(self, f):
|
||||
"""Decorator to configure the authentication callback.
|
||||
|
||||
This decorator must be used with a function that accepts the request
|
||||
object, a username and a password and returns a user object if the
|
||||
credentials are valid, or ``None`` if they are not. Example::
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, username, password):
|
||||
user = get_user(username)
|
||||
if user and user.check_password(password):
|
||||
return get_user(username)
|
||||
"""
|
||||
self.auth_callback = f
|
||||
|
||||
|
||||
class TokenAuth(BaseAuth):
|
||||
"""Token based authentication.
|
||||
|
||||
:param header: The name of the header that will contain the token. Defaults
|
||||
to 'Authorization'.
|
||||
:param scheme: The authentication scheme. Defaults to 'Bearer'.
|
||||
:param error_status: The error status code to return when authentication
|
||||
fails. Defaults to 401.
|
||||
"""
|
||||
def __init__(self, header='Authorization', scheme='Bearer',
|
||||
error_status=401):
|
||||
super().__init__()
|
||||
self.header = header
|
||||
self.scheme = scheme.lower()
|
||||
self.error_status = error_status
|
||||
self.error_callback = self.authentication_error
|
||||
|
||||
def _get_auth(self, request):
|
||||
auth = request.headers.get(self.header)
|
||||
if auth:
|
||||
if self.header == 'Authorization':
|
||||
try:
|
||||
scheme, token = auth.split(' ', 1)
|
||||
except Exception:
|
||||
return None
|
||||
if scheme.lower() == self.scheme:
|
||||
return (token.strip(),)
|
||||
else:
|
||||
return (auth,)
|
||||
|
||||
def authenticate(self, f):
|
||||
"""Decorator to configure the authentication callback.
|
||||
|
||||
This decorator must be used with a function that accepts the request
|
||||
object, a username and a password and returns a user object if the
|
||||
credentials are valid, or ``None`` if they are not. Example::
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, token):
|
||||
return get_user(token)
|
||||
"""
|
||||
self.auth_callback = f
|
||||
|
||||
def errorhandler(self, f):
|
||||
"""Decorator to configure the error callback.
|
||||
|
||||
Microdot calls the error callback to allow the application to generate
|
||||
a custom error response. The default error response is to call
|
||||
``abort(401)``.
|
||||
"""
|
||||
self.error_callback = f
|
||||
|
||||
def authentication_error(self, request):
|
||||
abort(self.error_status)
|
||||
150
src/microdot/login.py
Normal file
150
src/microdot/login.py
Normal file
@@ -0,0 +1,150 @@
|
||||
from time import time
|
||||
from microdot import redirect
|
||||
from microdot.microdot import urlencode, invoke_handler
|
||||
|
||||
|
||||
class Login:
|
||||
def __init__(self, login_url='/login'):
|
||||
self.login_url = login_url
|
||||
self.user_callback = None
|
||||
self.user_id_callback = lambda user: user.id
|
||||
|
||||
def id_to_user(self, f):
|
||||
"""Decorator to configure the user callback.
|
||||
|
||||
The decorated function receives the user ID as an argument and must
|
||||
return the corresponding user object, or ``None`` if the user ID is
|
||||
invalid.
|
||||
"""
|
||||
self.user_callback = f
|
||||
|
||||
def user_to_id(self, f):
|
||||
"""Decorator to configure the user ID callback.
|
||||
|
||||
The decorated functon receives the user object as an argument and must
|
||||
return the corresponding user ID. By default, the ``id`` attribute of
|
||||
the user object is used.
|
||||
"""
|
||||
self.user_id_callback = f
|
||||
|
||||
def _get_session(self, request):
|
||||
return request.app._session.get(request)
|
||||
|
||||
def _update_remember_cookie(self, request, days, user_id=None):
|
||||
remember_payload = request.app._session.encode({
|
||||
'user_id': user_id,
|
||||
'days': days,
|
||||
'exp': time() + days * 24 * 60 * 60
|
||||
})
|
||||
|
||||
@request.after_request
|
||||
async def _set_remember_cookie(request, response):
|
||||
response.set_cookie('_remember', remember_payload,
|
||||
max_age=days * 24 * 60 * 60)
|
||||
return response
|
||||
|
||||
def _get_user_id_from_session(self, request):
|
||||
session = self._get_session(request)
|
||||
if session and '_user_id' in session:
|
||||
return session['_user_id']
|
||||
if '_remember' in request.cookies:
|
||||
remember_payload = request.app._session.decode(
|
||||
request.cookies['_remember'])
|
||||
user_id = remember_payload.get('user_id')
|
||||
if user_id: # pragma: no branch
|
||||
self._update_remember_cookie(
|
||||
request, remember_payload.get('_days', 30), user_id)
|
||||
session['_user_id'] = user_id
|
||||
session['_fresh'] = False
|
||||
session.save()
|
||||
return user_id
|
||||
|
||||
async def _redirect_to_login(self, request):
|
||||
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
|
||||
request.url)}
|
||||
|
||||
async def login_user(self, request, user, remember=False,
|
||||
redirect_url='/'):
|
||||
"""Log a user in.
|
||||
|
||||
:param request: the request object
|
||||
:param user: the user object
|
||||
:param remember: if the user's logged in state should be remembered
|
||||
with a cookie after the session ends. Set to the
|
||||
number of days the remember cookie should last, or to
|
||||
``True`` to use a default duration of 30 days.
|
||||
:param redirect_url: the URL to redirect to after login
|
||||
|
||||
This call marks the user as logged in by storing their user ID in the
|
||||
user session. The application must call this method to log a user in
|
||||
after their credentials have been validated.
|
||||
|
||||
The method returns a redirect response, either to the URL the user
|
||||
originally intended to visit, or if there is no original URL to the URL
|
||||
specified by the `redirect_url`.
|
||||
"""
|
||||
session = self._get_session(request)
|
||||
session['_user_id'] = await invoke_handler(self.user_id_callback, user)
|
||||
session['_fresh'] = True
|
||||
session.save()
|
||||
|
||||
if remember:
|
||||
days = 30 if remember is True else int(remember)
|
||||
self._update_remember_cookie(request, days, session['_user_id'])
|
||||
|
||||
next_url = request.args.get('next', redirect_url)
|
||||
if not next_url.startswith('/'):
|
||||
next_url = redirect_url
|
||||
return redirect(next_url)
|
||||
|
||||
async def logout_user(self, request):
|
||||
"""Log a user out.
|
||||
|
||||
:param request: the request object
|
||||
|
||||
This call removes information about the user's log in from the user
|
||||
session. If a remember cookie exists, it is removed as well.
|
||||
"""
|
||||
session = self._get_session(request)
|
||||
session.pop('_user_id', None)
|
||||
session.pop('_fresh', None)
|
||||
session.save()
|
||||
if '_remember' in request.cookies:
|
||||
self._update_remember_cookie(request, 0)
|
||||
|
||||
def __call__(self, f):
|
||||
"""Decorator to protect a route with authentication.
|
||||
|
||||
If the user is not logged in, Microdot will redirect to the login page
|
||||
first. The decorated route will only run after successful login by the
|
||||
user. If the user is already logged in, the route will run immediately.
|
||||
"""
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
user_id = self._get_user_id_from_session(request)
|
||||
if not user_id:
|
||||
return await self._redirect_to_login(request)
|
||||
request.g.current_user = await invoke_handler(
|
||||
self.user_callback, user_id)
|
||||
if not request.g.current_user:
|
||||
return await self._redirect_to_login(request)
|
||||
return await invoke_handler(f, request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def fresh(self, f):
|
||||
"""Decorator to protect a route with "fresh" authentication.
|
||||
|
||||
This decorator prevents the route from running when the login session
|
||||
is not fresh. A fresh session is a session that has been created from
|
||||
direct user interaction with the login page, as opposite to a session
|
||||
that was restored from a "remember me" cookie.
|
||||
"""
|
||||
base_wrapper = self.__call__(f)
|
||||
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
session = self._get_session(request)
|
||||
if session.get('_fresh'):
|
||||
return await base_wrapper(request, *args, **kwargs)
|
||||
return await self._redirect_to_login(request)
|
||||
|
||||
return wrapper
|
||||
@@ -9,3 +9,4 @@ from tests.test_sse import * # noqa: F401, F403
|
||||
from tests.test_cors import * # noqa: F401, F403
|
||||
from tests.test_utemplate import * # noqa: F401, F403
|
||||
from tests.test_session import * # noqa: F401, F403
|
||||
from tests.test_auth import * # noqa: F401, F403
|
||||
|
||||
125
tests/test_auth.py
Normal file
125
tests/test_auth.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import asyncio
|
||||
import binascii
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from microdot.auth import BasicAuth, TokenAuth
|
||||
from microdot.test_client import TestClient
|
||||
|
||||
|
||||
class TestAuth(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if hasattr(asyncio, 'set_event_loop'):
|
||||
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||
cls.loop = asyncio.get_event_loop()
|
||||
|
||||
def _run(self, coro):
|
||||
return self.loop.run_until_complete(coro)
|
||||
|
||||
def test_basic_auth(self):
|
||||
app = Microdot()
|
||||
basic_auth = BasicAuth()
|
||||
|
||||
@basic_auth.authenticate
|
||||
def authenticate(request, username, password):
|
||||
if username == 'foo' and password == 'bar':
|
||||
return {'username': username}
|
||||
|
||||
@app.route('/')
|
||||
@basic_auth
|
||||
def index(request):
|
||||
return request.g.current_user['username']
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||
b'foo:bar').decode()}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'foo')
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||
b'foo:baz').decode()}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
def test_token_auth(self):
|
||||
app = Microdot()
|
||||
token_auth = TokenAuth()
|
||||
|
||||
@token_auth.authenticate
|
||||
def authenticate(request, token):
|
||||
if token == 'foo':
|
||||
return 'user'
|
||||
|
||||
@app.route('/')
|
||||
@token_auth
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
def test_token_auth_custom_header(self):
|
||||
app = Microdot()
|
||||
token_auth = TokenAuth(header='X-Auth-Token')
|
||||
|
||||
@token_auth.authenticate
|
||||
def authenticate(request, token):
|
||||
if token == 'foo':
|
||||
return 'user'
|
||||
|
||||
@app.route('/')
|
||||
@token_auth
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'X-Token-Auth': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
@token_auth.errorhandler
|
||||
def error_handler(request):
|
||||
return {'status_code': 403}, 403
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 403)
|
||||
self.assertEqual(res.json, {'status_code': 403})
|
||||
185
tests/test_login.py
Normal file
185
tests/test_login.py
Normal file
@@ -0,0 +1,185 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from microdot.login import Login
|
||||
from microdot.session import Session
|
||||
from microdot.test_client import TestClient
|
||||
|
||||
|
||||
class TestLogin(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if hasattr(asyncio, 'set_event_loop'):
|
||||
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||
cls.loop = asyncio.get_event_loop()
|
||||
|
||||
def _run(self, coro):
|
||||
return self.loop.run_until_complete(coro)
|
||||
|
||||
def test_login(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
class User:
|
||||
def __init__(self, id, name):
|
||||
self.id = id
|
||||
self.name = name
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return User(user_id, f'user{user_id}')
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
def index(request):
|
||||
return request.g.current_user.name
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, User(123, 'user123'))
|
||||
|
||||
@app.post('/logout')
|
||||
async def logout_route(request):
|
||||
await login.logout_user(request)
|
||||
return 'ok'
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/?foo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
|
||||
|
||||
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||
self.assertEqual(len(res.headers['Set-Cookie']), 1)
|
||||
self.assertIn('session', client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user123')
|
||||
|
||||
res = self._run(client.post('/logout'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
|
||||
def test_login_bad_user_id(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return None
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/foo')
|
||||
@login
|
||||
async def index(request):
|
||||
return 'ok'
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user')
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/')
|
||||
res = self._run(client.get('/foo'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/foo')
|
||||
|
||||
def test_login_bad_redirect(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return user_id
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
async def index(request):
|
||||
return 'ok'
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user')
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=http://example.com'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/')
|
||||
|
||||
def test_login_remember(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return user_id
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user', remember=True)
|
||||
|
||||
@app.post('/logout')
|
||||
async def logout(request):
|
||||
await login.logout_user(request)
|
||||
return 'ok'
|
||||
|
||||
@app.get('/fresh')
|
||||
@login.fresh
|
||||
async def fresh(request):
|
||||
return f'fresh {request.g.current_user}'
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||
self.assertEqual(len(res.headers['Set-Cookie']), 2)
|
||||
self.assertIn('session', client.cookies)
|
||||
self.assertIn('_remember', client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
res = self._run(client.get('/fresh'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'fresh user')
|
||||
|
||||
del client.cookies['session']
|
||||
print(client.cookies)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
res = self._run(client.get('/fresh'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
|
||||
|
||||
res = self._run(client.post('/logout'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('_remember' in client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
|
||||
|
||||
@app.post('/set')
|
||||
@with_session
|
||||
async def save_session(req, session):
|
||||
def save_session(req, session):
|
||||
session['name'] = 'joe'
|
||||
session.save()
|
||||
return 'OK'
|
||||
|
||||
Reference in New Issue
Block a user