Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e373d037f0 | ||
|
|
4321d93a82 | ||
|
|
68b6cb9862 | ||
|
|
c7f9b3ff3b | ||
|
|
261dd2f980 | ||
|
|
f204416e36 | ||
|
|
7bc66ce3bb | ||
|
|
43f2227140 | ||
|
|
b0cddde6ec |
16
docs/api.rst
16
docs/api.rst
@@ -44,6 +44,22 @@ User Sessions
|
|||||||
.. automodule:: microdot.session
|
.. automodule:: microdot.session
|
||||||
:members:
|
:members:
|
||||||
|
|
||||||
|
Authentication
|
||||||
|
--------------
|
||||||
|
|
||||||
|
.. automodule:: microdot.auth
|
||||||
|
:inherited-members:
|
||||||
|
:special-members: __call__
|
||||||
|
:members:
|
||||||
|
|
||||||
|
User Logins
|
||||||
|
-----------
|
||||||
|
|
||||||
|
.. automodule:: microdot.login
|
||||||
|
:inherited-members:
|
||||||
|
:special-members: __call__
|
||||||
|
:members:
|
||||||
|
|
||||||
Cross-Origin Resource Sharing (CORS)
|
Cross-Origin Resource Sharing (CORS)
|
||||||
------------------------------------
|
------------------------------------
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
|
|||||||
described in this section are maintained as part of the Microdot project in
|
described in this section are maintained as part of the Microdot project in
|
||||||
the same source code repository.
|
the same source code repository.
|
||||||
|
|
||||||
WebSocket Support
|
WebSocket
|
||||||
~~~~~~~~~~~~~~~~~
|
~~~~~~~~~
|
||||||
|
|
||||||
.. list-table::
|
.. list-table::
|
||||||
:align: left
|
:align: left
|
||||||
@@ -39,8 +39,8 @@ Example::
|
|||||||
message = await ws.receive()
|
message = await ws.receive()
|
||||||
await ws.send(message)
|
await ws.send(message)
|
||||||
|
|
||||||
Server-Sent Events Support
|
Server-Sent Events
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
.. list-table::
|
.. list-table::
|
||||||
:align: left
|
:align: left
|
||||||
@@ -78,8 +78,8 @@ Example::
|
|||||||
the SSE object. For bidirectional communication with the client, use the
|
the SSE object. For bidirectional communication with the client, use the
|
||||||
WebSocket extension.
|
WebSocket extension.
|
||||||
|
|
||||||
Rendering Templates
|
Templates
|
||||||
~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~
|
||||||
|
|
||||||
Many web applications use HTML templates for rendering content to clients.
|
Many web applications use HTML templates for rendering content to clients.
|
||||||
Microdot includes extensions to render templates with the
|
Microdot includes extensions to render templates with the
|
||||||
@@ -202,8 +202,8 @@ must be used.
|
|||||||
.. note::
|
.. note::
|
||||||
The Jinja extension is not compatible with MicroPython.
|
The Jinja extension is not compatible with MicroPython.
|
||||||
|
|
||||||
Maintaining Secure User Sessions
|
Secure User Sessions
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
.. list-table::
|
.. list-table::
|
||||||
:align: left
|
:align: left
|
||||||
@@ -270,6 +270,117 @@ The :func:`save() <microdot.session.SessionDict.save>` and
|
|||||||
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
|
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
|
||||||
and destroy the user session respectively.
|
and destroy the user session respectively.
|
||||||
|
|
||||||
|
Authentication
|
||||||
|
~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
.. list-table::
|
||||||
|
:align: left
|
||||||
|
|
||||||
|
* - Compatibility
|
||||||
|
- | CPython & MicroPython
|
||||||
|
|
||||||
|
* - Required Microdot source files
|
||||||
|
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||||
|
|
||||||
|
* - Required external dependencies
|
||||||
|
- | None
|
||||||
|
|
||||||
|
* - Examples
|
||||||
|
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
|
||||||
|
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
|
||||||
|
|
||||||
|
The authentication extension provides helper classes for two commonly used
|
||||||
|
authentication patterns, described below.
|
||||||
|
|
||||||
|
Basic Authentication
|
||||||
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
|
||||||
|
is a method of authentication that is part of the HTTP specification. It allows
|
||||||
|
clients to authenticate to a server using a username and a password. Web
|
||||||
|
browsers have native support for Basic Authentication and will automatically
|
||||||
|
prompt the user for a username and a password when a protected resource is
|
||||||
|
accessed.
|
||||||
|
|
||||||
|
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
|
||||||
|
class::
|
||||||
|
|
||||||
|
from microdot.auth import BasicAuth
|
||||||
|
|
||||||
|
auth = BasicAuth(app)
|
||||||
|
|
||||||
|
Next, create an authentication function. The function must accept a request
|
||||||
|
object and a username and password pair provided by the user. If the
|
||||||
|
credentials are valid, the function must return an object that represents the
|
||||||
|
user. Decorate the function with ``@auth.authenticate``::
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def verify_user(request, username, password):
|
||||||
|
user = await load_user_from_database(username)
|
||||||
|
if user and user.verify_password(password):
|
||||||
|
return user
|
||||||
|
|
||||||
|
If the authentication function cannot validate the user provided credentials it
|
||||||
|
must return ``None``.
|
||||||
|
|
||||||
|
To protect a route with authentication, add the ``auth`` instance as a
|
||||||
|
decorator::
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@auth
|
||||||
|
async def index(request):
|
||||||
|
return f'Hello, {request.g.current_user}!'
|
||||||
|
|
||||||
|
While running an authenticated request, the user object returned by the
|
||||||
|
authenticaction function is accessible as ``request.g.current_user``.
|
||||||
|
|
||||||
|
Token Authentication
|
||||||
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
|
||||||
|
|
||||||
|
from microdot.auth import TokenAuth
|
||||||
|
|
||||||
|
auth = TokenAuth()
|
||||||
|
|
||||||
|
Then add a function that verifies the token and returns the user it belongs to,
|
||||||
|
or ``None`` if the token is invalid or expired::
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def verify_token(request, token):
|
||||||
|
return load_user_from_token(token)
|
||||||
|
|
||||||
|
As with Basic authentication, the ``auth`` instance is used as a decorator to
|
||||||
|
protect your routes::
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@auth
|
||||||
|
async def index(request):
|
||||||
|
return f'Hello, {request.g.current_user}!'
|
||||||
|
|
||||||
|
User Logins
|
||||||
|
~~~~~~~~~~~
|
||||||
|
|
||||||
|
.. list-table::
|
||||||
|
:align: left
|
||||||
|
|
||||||
|
* - Compatibility
|
||||||
|
- | CPython & MicroPython
|
||||||
|
|
||||||
|
* - Required Microdot source files
|
||||||
|
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||||
|
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
|
||||||
|
|
||||||
|
* - Required external dependencies
|
||||||
|
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
|
||||||
|
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
|
||||||
|
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
|
||||||
|
|
||||||
|
* - Examples
|
||||||
|
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Cross-Origin Resource Sharing (CORS)
|
Cross-Origin Resource Sharing (CORS)
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
@@ -305,8 +416,8 @@ Example::
|
|||||||
cors = CORS(app, allowed_origins=['https://example.com'],
|
cors = CORS(app, allowed_origins=['https://example.com'],
|
||||||
allow_credentials=True)
|
allow_credentials=True)
|
||||||
|
|
||||||
Testing with the Test Client
|
Test Client
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~
|
||||||
|
|
||||||
.. list-table::
|
.. list-table::
|
||||||
:align: left
|
:align: left
|
||||||
@@ -342,8 +453,8 @@ Example::
|
|||||||
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
|
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
|
||||||
class for more details.
|
class for more details.
|
||||||
|
|
||||||
Deploying on a Production Web Server
|
Production Deployments
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
The ``Microdot`` class creates its own simple web server. This is enough for an
|
The ``Microdot`` class creates its own simple web server. This is enough for an
|
||||||
application deployed with MicroPython, but when using CPython it may be useful
|
application deployed with MicroPython, but when using CPython it may be useful
|
||||||
|
|||||||
1
examples/auth/README.md
Normal file
1
examples/auth/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
This directory contains examples that demonstrate basic and token authentication.
|
||||||
31
examples/auth/basic_auth.py
Normal file
31
examples/auth/basic_auth.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
from hashlib import sha1
|
||||||
|
from microdot import Microdot
|
||||||
|
from microdot.auth import BasicAuth
|
||||||
|
|
||||||
|
|
||||||
|
def create_hash(password):
|
||||||
|
return sha1(password).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
USERS = {
|
||||||
|
'susan': create_hash(b'hello'),
|
||||||
|
'david': create_hash(b'bye'),
|
||||||
|
}
|
||||||
|
app = Microdot()
|
||||||
|
auth = BasicAuth()
|
||||||
|
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def check_credentials(request, username, password):
|
||||||
|
if username in USERS and USERS[username] == create_hash(password.encode()):
|
||||||
|
return username
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@auth
|
||||||
|
async def index(request):
|
||||||
|
return f'Hello, {request.g.current_user}!'
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(debug=True)
|
||||||
26
examples/auth/token_auth.py
Normal file
26
examples/auth/token_auth.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
from microdot import Microdot
|
||||||
|
from microdot.auth import TokenAuth
|
||||||
|
|
||||||
|
app = Microdot()
|
||||||
|
auth = TokenAuth()
|
||||||
|
|
||||||
|
TOKENS = {
|
||||||
|
'susan-token': 'susan',
|
||||||
|
'david-token': 'david',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def check_token(request, token):
|
||||||
|
if token in TOKENS:
|
||||||
|
return TOKENS[token]
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@auth
|
||||||
|
async def index(request):
|
||||||
|
return f'Hello, {request.g.current_user}!'
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(debug=True)
|
||||||
1
examples/login/README.md
Normal file
1
examples/login/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
This directory contains examples that demonstrate user logins.
|
||||||
122
examples/login/login.py
Normal file
122
examples/login/login.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
from hashlib import sha1
|
||||||
|
from microdot import Microdot, redirect
|
||||||
|
from microdot.session import Session
|
||||||
|
from microdot.login import Login
|
||||||
|
|
||||||
|
|
||||||
|
class User:
|
||||||
|
def __init__(self, id, username, password):
|
||||||
|
self.id = id
|
||||||
|
self.username = username
|
||||||
|
self.password_hash = self.create_hash(password)
|
||||||
|
|
||||||
|
def create_hash(self, password):
|
||||||
|
# note: to keep this example simple, passwords are hashed with the SHA1
|
||||||
|
# algorithm. In a real application, you should use a stronger
|
||||||
|
# algorithm, such as bcrypt.
|
||||||
|
return sha1(password.encode()).hexdigest()
|
||||||
|
|
||||||
|
def check_password(self, password):
|
||||||
|
return self.create_hash(password) == self.password_hash
|
||||||
|
|
||||||
|
|
||||||
|
USERS = {
|
||||||
|
'user001': User('user001', 'susan', 'hello'),
|
||||||
|
'user002': User('user002', 'david', 'bye'),
|
||||||
|
}
|
||||||
|
|
||||||
|
app = Microdot()
|
||||||
|
Session(app, secret_key='top-secret!')
|
||||||
|
auth = Login()
|
||||||
|
|
||||||
|
|
||||||
|
@auth.id_to_user
|
||||||
|
async def get_user(user_id):
|
||||||
|
print('get_user', user_id)
|
||||||
|
return USERS.get(user_id)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/login', methods=['GET', 'POST'])
|
||||||
|
async def login(request):
|
||||||
|
if request.method == 'GET':
|
||||||
|
return '''
|
||||||
|
<!doctype html>
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<h1>Please Login</h1>
|
||||||
|
<form method="POST">
|
||||||
|
<p>
|
||||||
|
Username<br>
|
||||||
|
<input name="username" autofocus>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
Password:<br>
|
||||||
|
<input name="password" type="password">
|
||||||
|
<br>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<input name="remember_me" type="checkbox"> Remember me
|
||||||
|
<br>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<button type="submit">Login</button>
|
||||||
|
</p>
|
||||||
|
</form>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
''', {'Content-Type': 'text/html'}
|
||||||
|
username = request.form['username']
|
||||||
|
password = request.form['password']
|
||||||
|
remember_me = bool(request.form.get('remember_me'))
|
||||||
|
|
||||||
|
for user in USERS.values():
|
||||||
|
if user.username == username:
|
||||||
|
if user.check_password(password):
|
||||||
|
return await auth.login_user(request, user,
|
||||||
|
remember=remember_me)
|
||||||
|
return redirect('/login')
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@auth
|
||||||
|
async def index(request):
|
||||||
|
return f'''
|
||||||
|
<!doctype html>
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||||
|
<p>
|
||||||
|
<a href="/fresh">Click here</a> to access the fresh login page.
|
||||||
|
</p>
|
||||||
|
<form method="POST" action="/logout">
|
||||||
|
<button type="submit">Logout</button>
|
||||||
|
</form>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
''', {'Content-Type': 'text/html'}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get('/fresh')
|
||||||
|
@auth.fresh
|
||||||
|
async def fresh(request):
|
||||||
|
return f'''
|
||||||
|
<!doctype html>
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||||
|
<p>This page requires a fresh login session.</p>
|
||||||
|
<p><a href="/">Go back</a> to the main page.</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
''', {'Content-Type': 'text/html'}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post('/logout')
|
||||||
|
@auth
|
||||||
|
async def logout(request):
|
||||||
|
await auth.logout_user(request)
|
||||||
|
return redirect('/')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(debug=True)
|
||||||
@@ -1,3 +1,7 @@
|
|||||||
|
# This is a simple example that demonstrates how to use the user session, but
|
||||||
|
# is not intended as a complete login solution. See the login subdirectory for
|
||||||
|
# a more complete example.
|
||||||
|
|
||||||
from microdot import Microdot, Response, redirect
|
from microdot import Microdot, Response, redirect
|
||||||
from microdot.session import Session, with_session
|
from microdot.session import Session, with_session
|
||||||
|
|
||||||
|
|||||||
144
src/microdot/auth.py
Normal file
144
src/microdot/auth.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
from microdot import abort
|
||||||
|
from microdot.microdot import invoke_handler
|
||||||
|
|
||||||
|
|
||||||
|
class BaseAuth:
|
||||||
|
def __init__(self):
|
||||||
|
self.auth_callback = None
|
||||||
|
self.error_callback = None
|
||||||
|
|
||||||
|
def __call__(self, f):
|
||||||
|
"""Decorator to protect a route with authentication.
|
||||||
|
|
||||||
|
An instance of this class must be used as a decorator on the routes
|
||||||
|
that need to be protected. Example::
|
||||||
|
|
||||||
|
auth = BasicAuth() # or TokenAuth()
|
||||||
|
|
||||||
|
@app.route('/protected')
|
||||||
|
@auth
|
||||||
|
def protected(request):
|
||||||
|
# ...
|
||||||
|
|
||||||
|
Routes that are decorated in this way will only be invoked if the
|
||||||
|
authentication callback returned a valid user object, otherwise the
|
||||||
|
error callback will be executed.
|
||||||
|
"""
|
||||||
|
async def wrapper(request, *args, **kwargs):
|
||||||
|
auth = self._get_auth(request)
|
||||||
|
if not auth:
|
||||||
|
return await invoke_handler(self.error_callback, request)
|
||||||
|
request.g.current_user = await invoke_handler(
|
||||||
|
self.auth_callback, request, *auth)
|
||||||
|
if not request.g.current_user:
|
||||||
|
return await invoke_handler(self.error_callback, request)
|
||||||
|
return await invoke_handler(f, request, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class BasicAuth(BaseAuth):
|
||||||
|
"""Basic Authentication.
|
||||||
|
|
||||||
|
:param realm: The realm that is displayed when the user is prompted to
|
||||||
|
authenticate in the browser.
|
||||||
|
:param charset: The charset that is used to encode the realm.
|
||||||
|
:param scheme: The authentication scheme. Defaults to 'Basic'.
|
||||||
|
:param error_status: The error status code to return when authentication
|
||||||
|
fails. Defaults to 401.
|
||||||
|
"""
|
||||||
|
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
|
||||||
|
error_status=401):
|
||||||
|
super().__init__()
|
||||||
|
self.realm = realm
|
||||||
|
self.charset = charset
|
||||||
|
self.scheme = scheme
|
||||||
|
self.error_status = error_status
|
||||||
|
self.error_callback = self.authentication_error
|
||||||
|
|
||||||
|
def _get_auth(self, request):
|
||||||
|
auth = request.headers.get('Authorization')
|
||||||
|
if auth and auth.startswith('Basic '):
|
||||||
|
import binascii
|
||||||
|
try:
|
||||||
|
username, password = binascii.a2b_base64(
|
||||||
|
auth[6:]).decode().split(':', 1)
|
||||||
|
except Exception: # pragma: no cover
|
||||||
|
return None
|
||||||
|
return username, password
|
||||||
|
|
||||||
|
def authentication_error(self, request):
|
||||||
|
return '', self.error_status, {
|
||||||
|
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
|
||||||
|
self.scheme, self.realm, self.charset)}
|
||||||
|
|
||||||
|
def authenticate(self, f):
|
||||||
|
"""Decorator to configure the authentication callback.
|
||||||
|
|
||||||
|
This decorator must be used with a function that accepts the request
|
||||||
|
object, a username and a password and returns a user object if the
|
||||||
|
credentials are valid, or ``None`` if they are not. Example::
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def check_credentials(request, username, password):
|
||||||
|
user = get_user(username)
|
||||||
|
if user and user.check_password(password):
|
||||||
|
return get_user(username)
|
||||||
|
"""
|
||||||
|
self.auth_callback = f
|
||||||
|
|
||||||
|
|
||||||
|
class TokenAuth(BaseAuth):
|
||||||
|
"""Token based authentication.
|
||||||
|
|
||||||
|
:param header: The name of the header that will contain the token. Defaults
|
||||||
|
to 'Authorization'.
|
||||||
|
:param scheme: The authentication scheme. Defaults to 'Bearer'.
|
||||||
|
:param error_status: The error status code to return when authentication
|
||||||
|
fails. Defaults to 401.
|
||||||
|
"""
|
||||||
|
def __init__(self, header='Authorization', scheme='Bearer',
|
||||||
|
error_status=401):
|
||||||
|
super().__init__()
|
||||||
|
self.header = header
|
||||||
|
self.scheme = scheme.lower()
|
||||||
|
self.error_status = error_status
|
||||||
|
self.error_callback = self.authentication_error
|
||||||
|
|
||||||
|
def _get_auth(self, request):
|
||||||
|
auth = request.headers.get(self.header)
|
||||||
|
if auth:
|
||||||
|
if self.header == 'Authorization':
|
||||||
|
try:
|
||||||
|
scheme, token = auth.split(' ', 1)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
if scheme.lower() == self.scheme:
|
||||||
|
return (token.strip(),)
|
||||||
|
else:
|
||||||
|
return (auth,)
|
||||||
|
|
||||||
|
def authenticate(self, f):
|
||||||
|
"""Decorator to configure the authentication callback.
|
||||||
|
|
||||||
|
This decorator must be used with a function that accepts the request
|
||||||
|
object, a username and a password and returns a user object if the
|
||||||
|
credentials are valid, or ``None`` if they are not. Example::
|
||||||
|
|
||||||
|
@auth.authenticate
|
||||||
|
async def check_credentials(request, token):
|
||||||
|
return get_user(token)
|
||||||
|
"""
|
||||||
|
self.auth_callback = f
|
||||||
|
|
||||||
|
def errorhandler(self, f):
|
||||||
|
"""Decorator to configure the error callback.
|
||||||
|
|
||||||
|
Microdot calls the error callback to allow the application to generate
|
||||||
|
a custom error response. The default error response is to call
|
||||||
|
``abort(401)``.
|
||||||
|
"""
|
||||||
|
self.error_callback = f
|
||||||
|
|
||||||
|
def authentication_error(self, request):
|
||||||
|
abort(self.error_status)
|
||||||
150
src/microdot/login.py
Normal file
150
src/microdot/login.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
from time import time
|
||||||
|
from microdot import redirect
|
||||||
|
from microdot.microdot import urlencode, invoke_handler
|
||||||
|
|
||||||
|
|
||||||
|
class Login:
|
||||||
|
def __init__(self, login_url='/login'):
|
||||||
|
self.login_url = login_url
|
||||||
|
self.user_callback = None
|
||||||
|
self.user_id_callback = lambda user: user.id
|
||||||
|
|
||||||
|
def id_to_user(self, f):
|
||||||
|
"""Decorator to configure the user callback.
|
||||||
|
|
||||||
|
The decorated function receives the user ID as an argument and must
|
||||||
|
return the corresponding user object, or ``None`` if the user ID is
|
||||||
|
invalid.
|
||||||
|
"""
|
||||||
|
self.user_callback = f
|
||||||
|
|
||||||
|
def user_to_id(self, f):
|
||||||
|
"""Decorator to configure the user ID callback.
|
||||||
|
|
||||||
|
The decorated functon receives the user object as an argument and must
|
||||||
|
return the corresponding user ID. By default, the ``id`` attribute of
|
||||||
|
the user object is used.
|
||||||
|
"""
|
||||||
|
self.user_id_callback = f
|
||||||
|
|
||||||
|
def _get_session(self, request):
|
||||||
|
return request.app._session.get(request)
|
||||||
|
|
||||||
|
def _update_remember_cookie(self, request, days, user_id=None):
|
||||||
|
remember_payload = request.app._session.encode({
|
||||||
|
'user_id': user_id,
|
||||||
|
'days': days,
|
||||||
|
'exp': time() + days * 24 * 60 * 60
|
||||||
|
})
|
||||||
|
|
||||||
|
@request.after_request
|
||||||
|
async def _set_remember_cookie(request, response):
|
||||||
|
response.set_cookie('_remember', remember_payload,
|
||||||
|
max_age=days * 24 * 60 * 60)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _get_user_id_from_session(self, request):
|
||||||
|
session = self._get_session(request)
|
||||||
|
if session and '_user_id' in session:
|
||||||
|
return session['_user_id']
|
||||||
|
if '_remember' in request.cookies:
|
||||||
|
remember_payload = request.app._session.decode(
|
||||||
|
request.cookies['_remember'])
|
||||||
|
user_id = remember_payload.get('user_id')
|
||||||
|
if user_id: # pragma: no branch
|
||||||
|
self._update_remember_cookie(
|
||||||
|
request, remember_payload.get('_days', 30), user_id)
|
||||||
|
session['_user_id'] = user_id
|
||||||
|
session['_fresh'] = False
|
||||||
|
session.save()
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
async def _redirect_to_login(self, request):
|
||||||
|
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
|
||||||
|
request.url)}
|
||||||
|
|
||||||
|
async def login_user(self, request, user, remember=False,
|
||||||
|
redirect_url='/'):
|
||||||
|
"""Log a user in.
|
||||||
|
|
||||||
|
:param request: the request object
|
||||||
|
:param user: the user object
|
||||||
|
:param remember: if the user's logged in state should be remembered
|
||||||
|
with a cookie after the session ends. Set to the
|
||||||
|
number of days the remember cookie should last, or to
|
||||||
|
``True`` to use a default duration of 30 days.
|
||||||
|
:param redirect_url: the URL to redirect to after login
|
||||||
|
|
||||||
|
This call marks the user as logged in by storing their user ID in the
|
||||||
|
user session. The application must call this method to log a user in
|
||||||
|
after their credentials have been validated.
|
||||||
|
|
||||||
|
The method returns a redirect response, either to the URL the user
|
||||||
|
originally intended to visit, or if there is no original URL to the URL
|
||||||
|
specified by the `redirect_url`.
|
||||||
|
"""
|
||||||
|
session = self._get_session(request)
|
||||||
|
session['_user_id'] = await invoke_handler(self.user_id_callback, user)
|
||||||
|
session['_fresh'] = True
|
||||||
|
session.save()
|
||||||
|
|
||||||
|
if remember:
|
||||||
|
days = 30 if remember is True else int(remember)
|
||||||
|
self._update_remember_cookie(request, days, session['_user_id'])
|
||||||
|
|
||||||
|
next_url = request.args.get('next', redirect_url)
|
||||||
|
if not next_url.startswith('/'):
|
||||||
|
next_url = redirect_url
|
||||||
|
return redirect(next_url)
|
||||||
|
|
||||||
|
async def logout_user(self, request):
|
||||||
|
"""Log a user out.
|
||||||
|
|
||||||
|
:param request: the request object
|
||||||
|
|
||||||
|
This call removes information about the user's log in from the user
|
||||||
|
session. If a remember cookie exists, it is removed as well.
|
||||||
|
"""
|
||||||
|
session = self._get_session(request)
|
||||||
|
session.pop('_user_id', None)
|
||||||
|
session.pop('_fresh', None)
|
||||||
|
session.save()
|
||||||
|
if '_remember' in request.cookies:
|
||||||
|
self._update_remember_cookie(request, 0)
|
||||||
|
|
||||||
|
def __call__(self, f):
|
||||||
|
"""Decorator to protect a route with authentication.
|
||||||
|
|
||||||
|
If the user is not logged in, Microdot will redirect to the login page
|
||||||
|
first. The decorated route will only run after successful login by the
|
||||||
|
user. If the user is already logged in, the route will run immediately.
|
||||||
|
"""
|
||||||
|
async def wrapper(request, *args, **kwargs):
|
||||||
|
user_id = self._get_user_id_from_session(request)
|
||||||
|
if not user_id:
|
||||||
|
return await self._redirect_to_login(request)
|
||||||
|
request.g.current_user = await invoke_handler(
|
||||||
|
self.user_callback, user_id)
|
||||||
|
if not request.g.current_user:
|
||||||
|
return await self._redirect_to_login(request)
|
||||||
|
return await invoke_handler(f, request, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
def fresh(self, f):
|
||||||
|
"""Decorator to protect a route with "fresh" authentication.
|
||||||
|
|
||||||
|
This decorator prevents the route from running when the login session
|
||||||
|
is not fresh. A fresh session is a session that has been created from
|
||||||
|
direct user interaction with the login page, as opposite to a session
|
||||||
|
that was restored from a "remember me" cookie.
|
||||||
|
"""
|
||||||
|
base_wrapper = self.__call__(f)
|
||||||
|
|
||||||
|
async def wrapper(request, *args, **kwargs):
|
||||||
|
session = self._get_session(request)
|
||||||
|
if session.get('_fresh'):
|
||||||
|
return await base_wrapper(request, *args, **kwargs)
|
||||||
|
return await self._redirect_to_login(request)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
@@ -9,3 +9,4 @@ from tests.test_sse import * # noqa: F401, F403
|
|||||||
from tests.test_cors import * # noqa: F401, F403
|
from tests.test_cors import * # noqa: F401, F403
|
||||||
from tests.test_utemplate import * # noqa: F401, F403
|
from tests.test_utemplate import * # noqa: F401, F403
|
||||||
from tests.test_session import * # noqa: F401, F403
|
from tests.test_session import * # noqa: F401, F403
|
||||||
|
from tests.test_auth import * # noqa: F401, F403
|
||||||
|
|||||||
125
tests/test_auth.py
Normal file
125
tests/test_auth.py
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
import asyncio
|
||||||
|
import binascii
|
||||||
|
import unittest
|
||||||
|
from microdot import Microdot
|
||||||
|
from microdot.auth import BasicAuth, TokenAuth
|
||||||
|
from microdot.test_client import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
class TestAuth(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
if hasattr(asyncio, 'set_event_loop'):
|
||||||
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||||
|
cls.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def _run(self, coro):
|
||||||
|
return self.loop.run_until_complete(coro)
|
||||||
|
|
||||||
|
def test_basic_auth(self):
|
||||||
|
app = Microdot()
|
||||||
|
basic_auth = BasicAuth()
|
||||||
|
|
||||||
|
@basic_auth.authenticate
|
||||||
|
def authenticate(request, username, password):
|
||||||
|
if username == 'foo' and password == 'bar':
|
||||||
|
return {'username': username}
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@basic_auth
|
||||||
|
def index(request):
|
||||||
|
return request.g.current_user['username']
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||||
|
b'foo:bar').decode()}))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'foo')
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||||
|
b'foo:baz').decode()}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
def test_token_auth(self):
|
||||||
|
app = Microdot()
|
||||||
|
token_auth = TokenAuth()
|
||||||
|
|
||||||
|
@token_auth.authenticate
|
||||||
|
def authenticate(request, token):
|
||||||
|
if token == 'foo':
|
||||||
|
return 'user'
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@token_auth
|
||||||
|
def index(request):
|
||||||
|
return request.g.current_user
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Basic foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Bearer foo'}))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'user')
|
||||||
|
|
||||||
|
def test_token_auth_custom_header(self):
|
||||||
|
app = Microdot()
|
||||||
|
token_auth = TokenAuth(header='X-Auth-Token')
|
||||||
|
|
||||||
|
@token_auth.authenticate
|
||||||
|
def authenticate(request, token):
|
||||||
|
if token == 'foo':
|
||||||
|
return 'user'
|
||||||
|
|
||||||
|
@app.route('/')
|
||||||
|
@token_auth
|
||||||
|
def index(request):
|
||||||
|
return request.g.current_user
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Basic foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'Authorization': 'Bearer foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={
|
||||||
|
'X-Token-Auth': 'Bearer foo'}))
|
||||||
|
self.assertEqual(res.status_code, 401)
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'user')
|
||||||
|
|
||||||
|
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'user')
|
||||||
|
|
||||||
|
@token_auth.errorhandler
|
||||||
|
def error_handler(request):
|
||||||
|
return {'status_code': 403}, 403
|
||||||
|
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 403)
|
||||||
|
self.assertEqual(res.json, {'status_code': 403})
|
||||||
185
tests/test_login.py
Normal file
185
tests/test_login.py
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
import asyncio
|
||||||
|
import unittest
|
||||||
|
from microdot import Microdot
|
||||||
|
from microdot.login import Login
|
||||||
|
from microdot.session import Session
|
||||||
|
from microdot.test_client import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogin(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
if hasattr(asyncio, 'set_event_loop'):
|
||||||
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||||
|
cls.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def _run(self, coro):
|
||||||
|
return self.loop.run_until_complete(coro)
|
||||||
|
|
||||||
|
def test_login(self):
|
||||||
|
app = Microdot()
|
||||||
|
Session(app, secret_key='secret')
|
||||||
|
login = Login()
|
||||||
|
|
||||||
|
class User:
|
||||||
|
def __init__(self, id, name):
|
||||||
|
self.id = id
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
@login.id_to_user
|
||||||
|
def id_to_user(user_id):
|
||||||
|
return User(user_id, f'user{user_id}')
|
||||||
|
|
||||||
|
@app.get('/')
|
||||||
|
@login
|
||||||
|
def index(request):
|
||||||
|
return request.g.current_user.name
|
||||||
|
|
||||||
|
@app.post('/login')
|
||||||
|
async def login_route(request):
|
||||||
|
return await login.login_user(request, User(123, 'user123'))
|
||||||
|
|
||||||
|
@app.post('/logout')
|
||||||
|
async def logout_route(request):
|
||||||
|
await login.logout_user(request)
|
||||||
|
return 'ok'
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.get('/?foo=bar'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
|
||||||
|
|
||||||
|
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||||
|
self.assertEqual(len(res.headers['Set-Cookie']), 1)
|
||||||
|
self.assertIn('session', client.cookies)
|
||||||
|
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'user123')
|
||||||
|
|
||||||
|
res = self._run(client.post('/logout'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
|
||||||
|
def test_login_bad_user_id(self):
|
||||||
|
app = Microdot()
|
||||||
|
Session(app, secret_key='secret')
|
||||||
|
login = Login()
|
||||||
|
|
||||||
|
@login.id_to_user
|
||||||
|
def id_to_user(user_id):
|
||||||
|
return None
|
||||||
|
|
||||||
|
@login.user_to_id
|
||||||
|
def user_to_id(user):
|
||||||
|
return user
|
||||||
|
|
||||||
|
@app.get('/foo')
|
||||||
|
@login
|
||||||
|
async def index(request):
|
||||||
|
return 'ok'
|
||||||
|
|
||||||
|
@app.post('/login')
|
||||||
|
async def login_route(request):
|
||||||
|
return await login.login_user(request, 'user')
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.post('/login?next=/'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/')
|
||||||
|
res = self._run(client.get('/foo'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/login?next=/foo')
|
||||||
|
|
||||||
|
def test_login_bad_redirect(self):
|
||||||
|
app = Microdot()
|
||||||
|
Session(app, secret_key='secret')
|
||||||
|
login = Login()
|
||||||
|
|
||||||
|
@login.id_to_user
|
||||||
|
def id_to_user(user_id):
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
@login.user_to_id
|
||||||
|
def user_to_id(user):
|
||||||
|
return user
|
||||||
|
|
||||||
|
@app.get('/')
|
||||||
|
@login
|
||||||
|
async def index(request):
|
||||||
|
return 'ok'
|
||||||
|
|
||||||
|
@app.post('/login')
|
||||||
|
async def login_route(request):
|
||||||
|
return await login.login_user(request, 'user')
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.post('/login?next=http://example.com'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/')
|
||||||
|
|
||||||
|
def test_login_remember(self):
|
||||||
|
app = Microdot()
|
||||||
|
Session(app, secret_key='secret')
|
||||||
|
login = Login()
|
||||||
|
|
||||||
|
@login.id_to_user
|
||||||
|
def id_to_user(user_id):
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
@login.user_to_id
|
||||||
|
def user_to_id(user):
|
||||||
|
return user
|
||||||
|
|
||||||
|
@app.get('/')
|
||||||
|
@login
|
||||||
|
def index(request):
|
||||||
|
return request.g.current_user
|
||||||
|
|
||||||
|
@app.post('/login')
|
||||||
|
async def login_route(request):
|
||||||
|
return await login.login_user(request, 'user', remember=True)
|
||||||
|
|
||||||
|
@app.post('/logout')
|
||||||
|
async def logout(request):
|
||||||
|
await login.logout_user(request)
|
||||||
|
return 'ok'
|
||||||
|
|
||||||
|
@app.get('/fresh')
|
||||||
|
@login.fresh
|
||||||
|
async def fresh(request):
|
||||||
|
return f'fresh {request.g.current_user}'
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||||
|
self.assertEqual(len(res.headers['Set-Cookie']), 2)
|
||||||
|
self.assertIn('session', client.cookies)
|
||||||
|
self.assertIn('_remember', client.cookies)
|
||||||
|
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'user')
|
||||||
|
res = self._run(client.get('/fresh'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertEqual(res.text, 'fresh user')
|
||||||
|
|
||||||
|
del client.cookies['session']
|
||||||
|
print(client.cookies)
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
res = self._run(client.get('/fresh'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
|
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
|
||||||
|
|
||||||
|
res = self._run(client.post('/logout'))
|
||||||
|
self.assertEqual(res.status_code, 200)
|
||||||
|
self.assertFalse('_remember' in client.cookies)
|
||||||
|
|
||||||
|
res = self._run(client.get('/'))
|
||||||
|
self.assertEqual(res.status_code, 302)
|
||||||
@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
|
|||||||
|
|
||||||
@app.post('/set')
|
@app.post('/set')
|
||||||
@with_session
|
@with_session
|
||||||
async def save_session(req, session):
|
def save_session(req, session):
|
||||||
session['name'] = 'joe'
|
session['name'] = 'joe'
|
||||||
session.save()
|
session.save()
|
||||||
return 'OK'
|
return 'OK'
|
||||||
|
|||||||
Reference in New Issue
Block a user