Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e373d037f0 | ||
|
|
4321d93a82 | ||
|
|
68b6cb9862 | ||
|
|
c7f9b3ff3b | ||
|
|
261dd2f980 | ||
|
|
f204416e36 | ||
|
|
7bc66ce3bb | ||
|
|
43f2227140 | ||
|
|
b0cddde6ec | ||
|
|
dac6df7a7a | ||
|
|
5d6e838f3c | ||
|
|
563bfdc8f5 | ||
|
|
679d8e63b8 | ||
|
|
4cb155ee41 | ||
|
|
dea79c5ce2 | ||
|
|
6b1fd61917 | ||
|
|
f6876c0d15 | ||
|
|
904d5fcaa2 | ||
|
|
a0ea439def | ||
|
|
a1801d9a53 | ||
|
|
14f2c9d345 | ||
|
|
d0a4cf8fa7 | ||
|
|
901f4e55b8 |
1
.github/workflows/tests.yml
vendored
1
.github/workflows/tests.yml
vendored
@@ -64,6 +64,7 @@ jobs:
|
||||
with:
|
||||
files: ./coverage.xml
|
||||
fail_ci_if_error: true
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
benchmark:
|
||||
name: benchmark
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
# Microdot change log
|
||||
|
||||
**Release 2.0.5** - 2024-03-09
|
||||
|
||||
- Correct handling of 0 as an integer argument (regression from #207) [#212](https://github.com/miguelgrinberg/microdot/issues/212) ([commit](https://github.com/miguelgrinberg/microdot/commit/d0a4cf8fa7dfb1da7466157b18d3329a8cf9a5df))
|
||||
|
||||
**Release 2.0.4** - 2024-02-20
|
||||
|
||||
- Do not use regexes for parsing simple URLs [#207](https://github.com/miguelgrinberg/microdot/issues/207) ([commit #1](https://github.com/miguelgrinberg/microdot/commit/38262c56d34784401659639b482a4a1224e1e59a) [commit #2](https://github.com/miguelgrinberg/microdot/commit/f6cba2c0f7e18e2f32b5adb779fb037b6c473eab))
|
||||
|
||||
27
README.md
27
README.md
@@ -32,10 +32,25 @@ describes the backwards incompatible changes that were made.
|
||||
|
||||
## Resources
|
||||
|
||||
- Documentation
|
||||
- [Stable](https://microdot.readthedocs.io/en/stable/)
|
||||
- [Latest](https://microdot.readthedocs.io/en/latest/)
|
||||
- Still using version 1?
|
||||
- [Code](https://github.com/miguelgrinberg/microdot/tree/v1)
|
||||
- [Documentation](https://microdot.readthedocs.io/en/v1/)
|
||||
- [Change Log](https://github.com/miguelgrinberg/microdot/blob/main/CHANGES.md)
|
||||
- Documentation
|
||||
- [Latest](https://microdot.readthedocs.io/en/latest/)
|
||||
- [Stable (v2)](https://microdot.readthedocs.io/en/stable/)
|
||||
- [Legacy (v1)](https://microdot.readthedocs.io/en/v1/) ([Code](https://github.com/miguelgrinberg/microdot/tree/v1))
|
||||
|
||||
## Roadmap
|
||||
|
||||
The following features are planned for future releases of Microdot, both for
|
||||
MicroPython and CPython:
|
||||
|
||||
- Support for forms encoded in `multipart/form-data` format
|
||||
- Authentication support, similar to [Flask-Login](https://github.com/maxcountryman/flask-login) for Flask
|
||||
- OpenAPI integration, similar to [APIFairy](https://github.com/miguelgrinberg/apifairy) for Flask
|
||||
|
||||
In addition to the above, the following extensions are also under consideration,
|
||||
but only for CPython:
|
||||
|
||||
- Database integration through [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy)
|
||||
- Socket.IO support through [python-socketio](https://github.com/miguelgrinberg/python-socketio)
|
||||
|
||||
Do you have other ideas to propose? Let's [discuss them](https://github.com/miguelgrinberg/microdot/discussions/new?category=ideas)!
|
||||
|
||||
16
docs/api.rst
16
docs/api.rst
@@ -44,6 +44,22 @@ User Sessions
|
||||
.. automodule:: microdot.session
|
||||
:members:
|
||||
|
||||
Authentication
|
||||
--------------
|
||||
|
||||
.. automodule:: microdot.auth
|
||||
:inherited-members:
|
||||
:special-members: __call__
|
||||
:members:
|
||||
|
||||
User Logins
|
||||
-----------
|
||||
|
||||
.. automodule:: microdot.login
|
||||
:inherited-members:
|
||||
:special-members: __call__
|
||||
:members:
|
||||
|
||||
Cross-Origin Resource Sharing (CORS)
|
||||
------------------------------------
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ Microdot is a highly extensible web application framework. The extensions
|
||||
described in this section are maintained as part of the Microdot project in
|
||||
the same source code repository.
|
||||
|
||||
WebSocket Support
|
||||
~~~~~~~~~~~~~~~~~
|
||||
WebSocket
|
||||
~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -39,8 +39,8 @@ Example::
|
||||
message = await ws.receive()
|
||||
await ws.send(message)
|
||||
|
||||
Server-Sent Events Support
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Server-Sent Events
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -78,8 +78,8 @@ Example::
|
||||
the SSE object. For bidirectional communication with the client, use the
|
||||
WebSocket extension.
|
||||
|
||||
Rendering Templates
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
Templates
|
||||
~~~~~~~~~
|
||||
|
||||
Many web applications use HTML templates for rendering content to clients.
|
||||
Microdot includes extensions to render templates with the
|
||||
@@ -202,8 +202,8 @@ must be used.
|
||||
.. note::
|
||||
The Jinja extension is not compatible with MicroPython.
|
||||
|
||||
Maintaining Secure User Sessions
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Secure User Sessions
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -270,6 +270,117 @@ The :func:`save() <microdot.session.SessionDict.save>` and
|
||||
:func:`delete() <microdot.session.SessionDict.delete>` methods are used to update
|
||||
and destroy the user session respectively.
|
||||
|
||||
Authentication
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
|
||||
* - Compatibility
|
||||
- | CPython & MicroPython
|
||||
|
||||
* - Required Microdot source files
|
||||
- | `auth.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||
|
||||
* - Required external dependencies
|
||||
- | None
|
||||
|
||||
* - Examples
|
||||
- | `basic_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/basic_auth.py>`_
|
||||
| `token_auth.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/auth/token_auth.py>`_
|
||||
|
||||
The authentication extension provides helper classes for two commonly used
|
||||
authentication patterns, described below.
|
||||
|
||||
Basic Authentication
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
`Basic Authentication <https://en.wikipedia.org/wiki/Basic_access_authentication>`_
|
||||
is a method of authentication that is part of the HTTP specification. It allows
|
||||
clients to authenticate to a server using a username and a password. Web
|
||||
browsers have native support for Basic Authentication and will automatically
|
||||
prompt the user for a username and a password when a protected resource is
|
||||
accessed.
|
||||
|
||||
To use Basic Authentication, create an instance of the :class:`BasicAuth <microdot.auth.BasicAuth>`
|
||||
class::
|
||||
|
||||
from microdot.auth import BasicAuth
|
||||
|
||||
auth = BasicAuth(app)
|
||||
|
||||
Next, create an authentication function. The function must accept a request
|
||||
object and a username and password pair provided by the user. If the
|
||||
credentials are valid, the function must return an object that represents the
|
||||
user. Decorate the function with ``@auth.authenticate``::
|
||||
|
||||
@auth.authenticate
|
||||
async def verify_user(request, username, password):
|
||||
user = await load_user_from_database(username)
|
||||
if user and user.verify_password(password):
|
||||
return user
|
||||
|
||||
If the authentication function cannot validate the user provided credentials it
|
||||
must return ``None``.
|
||||
|
||||
To protect a route with authentication, add the ``auth`` instance as a
|
||||
decorator::
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
While running an authenticated request, the user object returned by the
|
||||
authenticaction function is accessible as ``request.g.current_user``.
|
||||
|
||||
Token Authentication
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To set up token authentication, create an instance of :class:`TokenAuth <microdot.auth.TokenAuth>`::
|
||||
|
||||
from microdot.auth import TokenAuth
|
||||
|
||||
auth = TokenAuth()
|
||||
|
||||
Then add a function that verifies the token and returns the user it belongs to,
|
||||
or ``None`` if the token is invalid or expired::
|
||||
|
||||
@auth.authenticate
|
||||
async def verify_token(request, token):
|
||||
return load_user_from_token(token)
|
||||
|
||||
As with Basic authentication, the ``auth`` instance is used as a decorator to
|
||||
protect your routes::
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
User Logins
|
||||
~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
|
||||
* - Compatibility
|
||||
- | CPython & MicroPython
|
||||
|
||||
* - Required Microdot source files
|
||||
- | `login.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/auth.py>`_
|
||||
| `session.py <https://github.com/miguelgrinberg/microdot/tree/main/src/microdot/session.py>`_
|
||||
|
||||
* - Required external dependencies
|
||||
- | CPython: `PyJWT <https://pyjwt.readthedocs.io/>`_
|
||||
| MicroPython: `jwt.py <https://github.com/micropython/micropython-lib/blob/master/python-ecosys/pyjwt/jwt.py>`_,
|
||||
`hmac.py <https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py>`_
|
||||
|
||||
* - Examples
|
||||
- | `login.py <https://github.com/miguelgrinberg/microdot/blob/main/examples/login/login.py>`_
|
||||
|
||||
|
||||
|
||||
Cross-Origin Resource Sharing (CORS)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -305,8 +416,8 @@ Example::
|
||||
cors = CORS(app, allowed_origins=['https://example.com'],
|
||||
allow_credentials=True)
|
||||
|
||||
Testing with the Test Client
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Test Client
|
||||
~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:align: left
|
||||
@@ -342,8 +453,8 @@ Example::
|
||||
See the documentation for the :class:`TestClient <microdot.test_client.TestClient>`
|
||||
class for more details.
|
||||
|
||||
Deploying on a Production Web Server
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Production Deployments
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ``Microdot`` class creates its own simple web server. This is enough for an
|
||||
application deployed with MicroPython, but when using CPython it may be useful
|
||||
|
||||
1
examples/auth/README.md
Normal file
1
examples/auth/README.md
Normal file
@@ -0,0 +1 @@
|
||||
This directory contains examples that demonstrate basic and token authentication.
|
||||
31
examples/auth/basic_auth.py
Normal file
31
examples/auth/basic_auth.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from hashlib import sha1
|
||||
from microdot import Microdot
|
||||
from microdot.auth import BasicAuth
|
||||
|
||||
|
||||
def create_hash(password):
|
||||
return sha1(password).hexdigest()
|
||||
|
||||
|
||||
USERS = {
|
||||
'susan': create_hash(b'hello'),
|
||||
'david': create_hash(b'bye'),
|
||||
}
|
||||
app = Microdot()
|
||||
auth = BasicAuth()
|
||||
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, username, password):
|
||||
if username in USERS and USERS[username] == create_hash(password.encode()):
|
||||
return username
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
26
examples/auth/token_auth.py
Normal file
26
examples/auth/token_auth.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from microdot import Microdot
|
||||
from microdot.auth import TokenAuth
|
||||
|
||||
app = Microdot()
|
||||
auth = TokenAuth()
|
||||
|
||||
TOKENS = {
|
||||
'susan-token': 'susan',
|
||||
'david-token': 'david',
|
||||
}
|
||||
|
||||
|
||||
@auth.authenticate
|
||||
async def check_token(request, token):
|
||||
if token in TOKENS:
|
||||
return TOKENS[token]
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'Hello, {request.g.current_user}!'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
@@ -32,7 +32,7 @@ flask==3.0.0
|
||||
# via
|
||||
# -r requirements.in
|
||||
# quart
|
||||
gunicorn==21.2.0
|
||||
gunicorn==22.0.0
|
||||
# via -r requirements.in
|
||||
h11==0.14.0
|
||||
# via
|
||||
@@ -49,7 +49,7 @@ hypercorn==0.15.0
|
||||
# via quart
|
||||
hyperframe==6.0.1
|
||||
# via h2
|
||||
idna==3.6
|
||||
idna==3.7
|
||||
# via
|
||||
# anyio
|
||||
# requests
|
||||
|
||||
1
examples/login/README.md
Normal file
1
examples/login/README.md
Normal file
@@ -0,0 +1 @@
|
||||
This directory contains examples that demonstrate user logins.
|
||||
122
examples/login/login.py
Normal file
122
examples/login/login.py
Normal file
@@ -0,0 +1,122 @@
|
||||
from hashlib import sha1
|
||||
from microdot import Microdot, redirect
|
||||
from microdot.session import Session
|
||||
from microdot.login import Login
|
||||
|
||||
|
||||
class User:
|
||||
def __init__(self, id, username, password):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.password_hash = self.create_hash(password)
|
||||
|
||||
def create_hash(self, password):
|
||||
# note: to keep this example simple, passwords are hashed with the SHA1
|
||||
# algorithm. In a real application, you should use a stronger
|
||||
# algorithm, such as bcrypt.
|
||||
return sha1(password.encode()).hexdigest()
|
||||
|
||||
def check_password(self, password):
|
||||
return self.create_hash(password) == self.password_hash
|
||||
|
||||
|
||||
USERS = {
|
||||
'user001': User('user001', 'susan', 'hello'),
|
||||
'user002': User('user002', 'david', 'bye'),
|
||||
}
|
||||
|
||||
app = Microdot()
|
||||
Session(app, secret_key='top-secret!')
|
||||
auth = Login()
|
||||
|
||||
|
||||
@auth.id_to_user
|
||||
async def get_user(user_id):
|
||||
print('get_user', user_id)
|
||||
return USERS.get(user_id)
|
||||
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
async def login(request):
|
||||
if request.method == 'GET':
|
||||
return '''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Please Login</h1>
|
||||
<form method="POST">
|
||||
<p>
|
||||
Username<br>
|
||||
<input name="username" autofocus>
|
||||
</p>
|
||||
<p>
|
||||
Password:<br>
|
||||
<input name="password" type="password">
|
||||
<br>
|
||||
</p>
|
||||
<p>
|
||||
<input name="remember_me" type="checkbox"> Remember me
|
||||
<br>
|
||||
</p>
|
||||
<p>
|
||||
<button type="submit">Login</button>
|
||||
</p>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
remember_me = bool(request.form.get('remember_me'))
|
||||
|
||||
for user in USERS.values():
|
||||
if user.username == username:
|
||||
if user.check_password(password):
|
||||
return await auth.login_user(request, user,
|
||||
remember=remember_me)
|
||||
return redirect('/login')
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@auth
|
||||
async def index(request):
|
||||
return f'''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||
<p>
|
||||
<a href="/fresh">Click here</a> to access the fresh login page.
|
||||
</p>
|
||||
<form method="POST" action="/logout">
|
||||
<button type="submit">Logout</button>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
|
||||
|
||||
@app.get('/fresh')
|
||||
@auth.fresh
|
||||
async def fresh(request):
|
||||
return f'''
|
||||
<!doctype html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Hello, {request.g.current_user.username}!</h1>
|
||||
<p>This page requires a fresh login session.</p>
|
||||
<p><a href="/">Go back</a> to the main page.</p>
|
||||
</body>
|
||||
</html>
|
||||
''', {'Content-Type': 'text/html'}
|
||||
|
||||
|
||||
@app.post('/logout')
|
||||
@auth
|
||||
async def logout(request):
|
||||
await auth.logout_user(request)
|
||||
return redirect('/')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
@@ -1,3 +1,7 @@
|
||||
# This is a simple example that demonstrates how to use the user session, but
|
||||
# is not intended as a complete login solution. See the login subdirectory for
|
||||
# a more complete example.
|
||||
|
||||
from microdot import Microdot, Response, redirect
|
||||
from microdot.session import Session, with_session
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "microdot"
|
||||
version = "2.0.4"
|
||||
version = "2.0.6.dev0"
|
||||
authors = [
|
||||
{ name = "Miguel Grinberg", email = "miguel.grinberg@gmail.com" },
|
||||
]
|
||||
@@ -26,6 +26,7 @@ Homepage = "https://github.com/miguelgrinberg/microdot"
|
||||
[project.optional-dependencies]
|
||||
docs = [
|
||||
"sphinx",
|
||||
"pyjwt"
|
||||
]
|
||||
|
||||
[tool.setuptools]
|
||||
|
||||
144
src/microdot/auth.py
Normal file
144
src/microdot/auth.py
Normal file
@@ -0,0 +1,144 @@
|
||||
from microdot import abort
|
||||
from microdot.microdot import invoke_handler
|
||||
|
||||
|
||||
class BaseAuth:
|
||||
def __init__(self):
|
||||
self.auth_callback = None
|
||||
self.error_callback = None
|
||||
|
||||
def __call__(self, f):
|
||||
"""Decorator to protect a route with authentication.
|
||||
|
||||
An instance of this class must be used as a decorator on the routes
|
||||
that need to be protected. Example::
|
||||
|
||||
auth = BasicAuth() # or TokenAuth()
|
||||
|
||||
@app.route('/protected')
|
||||
@auth
|
||||
def protected(request):
|
||||
# ...
|
||||
|
||||
Routes that are decorated in this way will only be invoked if the
|
||||
authentication callback returned a valid user object, otherwise the
|
||||
error callback will be executed.
|
||||
"""
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
auth = self._get_auth(request)
|
||||
if not auth:
|
||||
return await invoke_handler(self.error_callback, request)
|
||||
request.g.current_user = await invoke_handler(
|
||||
self.auth_callback, request, *auth)
|
||||
if not request.g.current_user:
|
||||
return await invoke_handler(self.error_callback, request)
|
||||
return await invoke_handler(f, request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class BasicAuth(BaseAuth):
|
||||
"""Basic Authentication.
|
||||
|
||||
:param realm: The realm that is displayed when the user is prompted to
|
||||
authenticate in the browser.
|
||||
:param charset: The charset that is used to encode the realm.
|
||||
:param scheme: The authentication scheme. Defaults to 'Basic'.
|
||||
:param error_status: The error status code to return when authentication
|
||||
fails. Defaults to 401.
|
||||
"""
|
||||
def __init__(self, realm='Please login', charset='UTF-8', scheme='Basic',
|
||||
error_status=401):
|
||||
super().__init__()
|
||||
self.realm = realm
|
||||
self.charset = charset
|
||||
self.scheme = scheme
|
||||
self.error_status = error_status
|
||||
self.error_callback = self.authentication_error
|
||||
|
||||
def _get_auth(self, request):
|
||||
auth = request.headers.get('Authorization')
|
||||
if auth and auth.startswith('Basic '):
|
||||
import binascii
|
||||
try:
|
||||
username, password = binascii.a2b_base64(
|
||||
auth[6:]).decode().split(':', 1)
|
||||
except Exception: # pragma: no cover
|
||||
return None
|
||||
return username, password
|
||||
|
||||
def authentication_error(self, request):
|
||||
return '', self.error_status, {
|
||||
'WWW-Authenticate': '{} realm="{}", charset="{}"'.format(
|
||||
self.scheme, self.realm, self.charset)}
|
||||
|
||||
def authenticate(self, f):
|
||||
"""Decorator to configure the authentication callback.
|
||||
|
||||
This decorator must be used with a function that accepts the request
|
||||
object, a username and a password and returns a user object if the
|
||||
credentials are valid, or ``None`` if they are not. Example::
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, username, password):
|
||||
user = get_user(username)
|
||||
if user and user.check_password(password):
|
||||
return get_user(username)
|
||||
"""
|
||||
self.auth_callback = f
|
||||
|
||||
|
||||
class TokenAuth(BaseAuth):
|
||||
"""Token based authentication.
|
||||
|
||||
:param header: The name of the header that will contain the token. Defaults
|
||||
to 'Authorization'.
|
||||
:param scheme: The authentication scheme. Defaults to 'Bearer'.
|
||||
:param error_status: The error status code to return when authentication
|
||||
fails. Defaults to 401.
|
||||
"""
|
||||
def __init__(self, header='Authorization', scheme='Bearer',
|
||||
error_status=401):
|
||||
super().__init__()
|
||||
self.header = header
|
||||
self.scheme = scheme.lower()
|
||||
self.error_status = error_status
|
||||
self.error_callback = self.authentication_error
|
||||
|
||||
def _get_auth(self, request):
|
||||
auth = request.headers.get(self.header)
|
||||
if auth:
|
||||
if self.header == 'Authorization':
|
||||
try:
|
||||
scheme, token = auth.split(' ', 1)
|
||||
except Exception:
|
||||
return None
|
||||
if scheme.lower() == self.scheme:
|
||||
return (token.strip(),)
|
||||
else:
|
||||
return (auth,)
|
||||
|
||||
def authenticate(self, f):
|
||||
"""Decorator to configure the authentication callback.
|
||||
|
||||
This decorator must be used with a function that accepts the request
|
||||
object, a username and a password and returns a user object if the
|
||||
credentials are valid, or ``None`` if they are not. Example::
|
||||
|
||||
@auth.authenticate
|
||||
async def check_credentials(request, token):
|
||||
return get_user(token)
|
||||
"""
|
||||
self.auth_callback = f
|
||||
|
||||
def errorhandler(self, f):
|
||||
"""Decorator to configure the error callback.
|
||||
|
||||
Microdot calls the error callback to allow the application to generate
|
||||
a custom error response. The default error response is to call
|
||||
``abort(401)``.
|
||||
"""
|
||||
self.error_callback = f
|
||||
|
||||
def authentication_error(self, request):
|
||||
abort(self.error_status)
|
||||
8
src/microdot/helpers.py
Normal file
8
src/microdot/helpers.py
Normal file
@@ -0,0 +1,8 @@
|
||||
try:
|
||||
from functools import wraps
|
||||
except ImportError: # pragma: no cover
|
||||
# MicroPython does not currently implement functools.wraps
|
||||
def wraps(wrapped):
|
||||
def _(wrapper):
|
||||
return wrapper
|
||||
return _
|
||||
150
src/microdot/login.py
Normal file
150
src/microdot/login.py
Normal file
@@ -0,0 +1,150 @@
|
||||
from time import time
|
||||
from microdot import redirect
|
||||
from microdot.microdot import urlencode, invoke_handler
|
||||
|
||||
|
||||
class Login:
|
||||
def __init__(self, login_url='/login'):
|
||||
self.login_url = login_url
|
||||
self.user_callback = None
|
||||
self.user_id_callback = lambda user: user.id
|
||||
|
||||
def id_to_user(self, f):
|
||||
"""Decorator to configure the user callback.
|
||||
|
||||
The decorated function receives the user ID as an argument and must
|
||||
return the corresponding user object, or ``None`` if the user ID is
|
||||
invalid.
|
||||
"""
|
||||
self.user_callback = f
|
||||
|
||||
def user_to_id(self, f):
|
||||
"""Decorator to configure the user ID callback.
|
||||
|
||||
The decorated functon receives the user object as an argument and must
|
||||
return the corresponding user ID. By default, the ``id`` attribute of
|
||||
the user object is used.
|
||||
"""
|
||||
self.user_id_callback = f
|
||||
|
||||
def _get_session(self, request):
|
||||
return request.app._session.get(request)
|
||||
|
||||
def _update_remember_cookie(self, request, days, user_id=None):
|
||||
remember_payload = request.app._session.encode({
|
||||
'user_id': user_id,
|
||||
'days': days,
|
||||
'exp': time() + days * 24 * 60 * 60
|
||||
})
|
||||
|
||||
@request.after_request
|
||||
async def _set_remember_cookie(request, response):
|
||||
response.set_cookie('_remember', remember_payload,
|
||||
max_age=days * 24 * 60 * 60)
|
||||
return response
|
||||
|
||||
def _get_user_id_from_session(self, request):
|
||||
session = self._get_session(request)
|
||||
if session and '_user_id' in session:
|
||||
return session['_user_id']
|
||||
if '_remember' in request.cookies:
|
||||
remember_payload = request.app._session.decode(
|
||||
request.cookies['_remember'])
|
||||
user_id = remember_payload.get('user_id')
|
||||
if user_id: # pragma: no branch
|
||||
self._update_remember_cookie(
|
||||
request, remember_payload.get('_days', 30), user_id)
|
||||
session['_user_id'] = user_id
|
||||
session['_fresh'] = False
|
||||
session.save()
|
||||
return user_id
|
||||
|
||||
async def _redirect_to_login(self, request):
|
||||
return '', 302, {'Location': self.login_url + '?next=' + urlencode(
|
||||
request.url)}
|
||||
|
||||
async def login_user(self, request, user, remember=False,
|
||||
redirect_url='/'):
|
||||
"""Log a user in.
|
||||
|
||||
:param request: the request object
|
||||
:param user: the user object
|
||||
:param remember: if the user's logged in state should be remembered
|
||||
with a cookie after the session ends. Set to the
|
||||
number of days the remember cookie should last, or to
|
||||
``True`` to use a default duration of 30 days.
|
||||
:param redirect_url: the URL to redirect to after login
|
||||
|
||||
This call marks the user as logged in by storing their user ID in the
|
||||
user session. The application must call this method to log a user in
|
||||
after their credentials have been validated.
|
||||
|
||||
The method returns a redirect response, either to the URL the user
|
||||
originally intended to visit, or if there is no original URL to the URL
|
||||
specified by the `redirect_url`.
|
||||
"""
|
||||
session = self._get_session(request)
|
||||
session['_user_id'] = await invoke_handler(self.user_id_callback, user)
|
||||
session['_fresh'] = True
|
||||
session.save()
|
||||
|
||||
if remember:
|
||||
days = 30 if remember is True else int(remember)
|
||||
self._update_remember_cookie(request, days, session['_user_id'])
|
||||
|
||||
next_url = request.args.get('next', redirect_url)
|
||||
if not next_url.startswith('/'):
|
||||
next_url = redirect_url
|
||||
return redirect(next_url)
|
||||
|
||||
async def logout_user(self, request):
|
||||
"""Log a user out.
|
||||
|
||||
:param request: the request object
|
||||
|
||||
This call removes information about the user's log in from the user
|
||||
session. If a remember cookie exists, it is removed as well.
|
||||
"""
|
||||
session = self._get_session(request)
|
||||
session.pop('_user_id', None)
|
||||
session.pop('_fresh', None)
|
||||
session.save()
|
||||
if '_remember' in request.cookies:
|
||||
self._update_remember_cookie(request, 0)
|
||||
|
||||
def __call__(self, f):
|
||||
"""Decorator to protect a route with authentication.
|
||||
|
||||
If the user is not logged in, Microdot will redirect to the login page
|
||||
first. The decorated route will only run after successful login by the
|
||||
user. If the user is already logged in, the route will run immediately.
|
||||
"""
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
user_id = self._get_user_id_from_session(request)
|
||||
if not user_id:
|
||||
return await self._redirect_to_login(request)
|
||||
request.g.current_user = await invoke_handler(
|
||||
self.user_callback, user_id)
|
||||
if not request.g.current_user:
|
||||
return await self._redirect_to_login(request)
|
||||
return await invoke_handler(f, request, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
def fresh(self, f):
|
||||
"""Decorator to protect a route with "fresh" authentication.
|
||||
|
||||
This decorator prevents the route from running when the login session
|
||||
is not fresh. A fresh session is a session that has been created from
|
||||
direct user interaction with the login page, as opposite to a session
|
||||
that was restored from a "remember me" cookie.
|
||||
"""
|
||||
base_wrapper = self.__call__(f)
|
||||
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
session = self._get_session(request)
|
||||
if session.get('_fresh'):
|
||||
return await base_wrapper(request, *args, **kwargs)
|
||||
return await self._redirect_to_login(request)
|
||||
|
||||
return wrapper
|
||||
@@ -598,7 +598,7 @@ class Response:
|
||||
else: # pragma: no cover
|
||||
http_cookie += '; Expires=' + time.strftime(
|
||||
'%a, %d %b %Y %H:%M:%S GMT', expires.timetuple())
|
||||
if max_age:
|
||||
if max_age is not None:
|
||||
http_cookie += '; Max-Age=' + str(max_age)
|
||||
if secure:
|
||||
http_cookie += '; Secure'
|
||||
@@ -616,10 +616,10 @@ class Response:
|
||||
|
||||
:param cookie: The cookie's name.
|
||||
:param kwargs: Any cookie opens and flags supported by
|
||||
``set_cookie()`` except ``expires``.
|
||||
``set_cookie()`` except ``expires`` and ``max_age``.
|
||||
"""
|
||||
self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT',
|
||||
**kwargs)
|
||||
max_age=0, **kwargs)
|
||||
|
||||
def complete(self):
|
||||
if isinstance(self.body, bytes) and \
|
||||
@@ -862,8 +862,6 @@ class URLPattern():
|
||||
if arg is None:
|
||||
return
|
||||
if 'name' in segment:
|
||||
if not arg:
|
||||
return
|
||||
args[segment['name']] = arg
|
||||
if path is not None:
|
||||
return
|
||||
@@ -879,6 +877,8 @@ class URLPattern():
|
||||
|
||||
def _string_segment(self, value):
|
||||
s = value.split('/', 1)
|
||||
if len(s[0]) == 0:
|
||||
return None, None
|
||||
return s[0], s[1] if len(s) > 1 else None
|
||||
|
||||
def _int_segment(self, value):
|
||||
@@ -1191,7 +1191,7 @@ class Microdot:
|
||||
Example::
|
||||
|
||||
import asyncio
|
||||
from microdot_asyncio import Microdot
|
||||
from microdot import Microdot
|
||||
|
||||
app = Microdot()
|
||||
|
||||
@@ -1268,7 +1268,7 @@ class Microdot:
|
||||
|
||||
Example::
|
||||
|
||||
from microdot_asyncio import Microdot
|
||||
from microdot import Microdot
|
||||
|
||||
app = Microdot()
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import jwt
|
||||
from microdot.microdot import invoke_handler
|
||||
|
||||
secret_key = None
|
||||
from microdot.helpers import wraps
|
||||
|
||||
|
||||
class SessionDict(dict):
|
||||
@@ -57,13 +56,7 @@ class Session:
|
||||
if session is None:
|
||||
request.g._session = SessionDict(request, {})
|
||||
return request.g._session
|
||||
try:
|
||||
session = jwt.decode(session, self.secret_key,
|
||||
algorithms=['HS256'])
|
||||
except jwt.exceptions.PyJWTError: # pragma: no cover
|
||||
request.g._session = SessionDict(request, {})
|
||||
else:
|
||||
request.g._session = SessionDict(request, session)
|
||||
request.g._session = SessionDict(request, self.decode(session))
|
||||
return request.g._session
|
||||
|
||||
def update(self, request, session):
|
||||
@@ -89,8 +82,7 @@ class Session:
|
||||
if not self.secret_key:
|
||||
raise ValueError('The session secret key is not configured')
|
||||
|
||||
encoded_session = jwt.encode(session, self.secret_key,
|
||||
algorithm='HS256')
|
||||
encoded_session = self.encode(session)
|
||||
|
||||
@request.after_request
|
||||
def _update_session(request, response):
|
||||
@@ -121,6 +113,18 @@ class Session:
|
||||
expires='Thu, 01 Jan 1970 00:00:01 GMT')
|
||||
return response
|
||||
|
||||
def encode(self, payload, secret_key=None):
|
||||
return jwt.encode(payload, secret_key or self.secret_key,
|
||||
algorithm='HS256')
|
||||
|
||||
def decode(self, session, secret_key=None):
|
||||
try:
|
||||
payload = jwt.decode(session, secret_key or self.secret_key,
|
||||
algorithms=['HS256'])
|
||||
except jwt.exceptions.PyJWTError: # pragma: no cover
|
||||
return {}
|
||||
return payload
|
||||
|
||||
|
||||
def with_session(f):
|
||||
"""Decorator that passes the user session to the route handler.
|
||||
@@ -136,13 +140,9 @@ def with_session(f):
|
||||
Note that the decorator does not save the session. To update the session,
|
||||
call the :func:`session.save() <microdot.session.SessionDict.save>` method.
|
||||
"""
|
||||
@wraps(f)
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
return await invoke_handler(
|
||||
f, request, request.app._session.get(request), *args, **kwargs)
|
||||
|
||||
for attr in ['__name__', '__doc__', '__module__', '__qualname__']:
|
||||
try:
|
||||
setattr(wrapper, attr, getattr(f, attr))
|
||||
except AttributeError: # pragma: no cover
|
||||
pass
|
||||
return wrapper
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
from microdot.helpers import wraps
|
||||
|
||||
|
||||
class SSE:
|
||||
@@ -12,7 +13,7 @@ class SSE:
|
||||
self.event = asyncio.Event()
|
||||
self.queue = []
|
||||
|
||||
async def send(self, data, event=None):
|
||||
async def send(self, data, event=None, event_id=None):
|
||||
"""Send an event to the client.
|
||||
|
||||
:param data: the data to send. It can be given as a string, bytes, dict
|
||||
@@ -20,6 +21,8 @@ class SSE:
|
||||
Any other types are converted to string before sending.
|
||||
:param event: an optional event name, to send along with the data. If
|
||||
given, it must be a string.
|
||||
:param event_id: an optional event id, to send along with the data. If
|
||||
given, it must be a string.
|
||||
"""
|
||||
if isinstance(data, (dict, list)):
|
||||
data = json.dumps(data).encode()
|
||||
@@ -28,6 +31,8 @@ class SSE:
|
||||
elif not isinstance(data, bytes):
|
||||
data = str(data).encode()
|
||||
data = b'data: ' + data + b'\n\n'
|
||||
if event_id:
|
||||
data = b'id: ' + event_id.encode() + b'\n' + data
|
||||
if event:
|
||||
data = b'event: ' + event.encode() + b'\n' + data
|
||||
self.queue.append(data)
|
||||
@@ -99,6 +104,7 @@ def with_sse(f):
|
||||
# send a named event
|
||||
await sse.send('hello', event='greeting')
|
||||
"""
|
||||
@wraps(f)
|
||||
async def sse_handler(request, *args, **kwargs):
|
||||
return sse_response(request, f, *args, **kwargs)
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ class TestClient:
|
||||
The following example shows how to create a test client for an application
|
||||
and send a test request::
|
||||
|
||||
from microdot_asyncio import Microdot
|
||||
from microdot import Microdot
|
||||
|
||||
app = Microdot()
|
||||
|
||||
@@ -141,7 +141,17 @@ class TestClient:
|
||||
cookie_options = cookie_value.split(';')
|
||||
delete = False
|
||||
for option in cookie_options[1:]:
|
||||
if option.strip().lower().startswith('expires='):
|
||||
if option.strip().lower().startswith(
|
||||
'max-age='): # pragma: no cover
|
||||
_, age = option.strip().split('=', 1)
|
||||
try:
|
||||
age = int(age)
|
||||
except ValueError: # pragma: no cover
|
||||
age = 0
|
||||
if age <= 0:
|
||||
delete = True
|
||||
break
|
||||
elif option.strip().lower().startswith('expires='):
|
||||
_, e = option.strip().split('=', 1)
|
||||
# this is a very limited parser for cookie expiry
|
||||
# that only detects a cookie deletion request when
|
||||
|
||||
@@ -2,6 +2,7 @@ import binascii
|
||||
import hashlib
|
||||
from microdot import Request, Response
|
||||
from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception
|
||||
from microdot.helpers import wraps
|
||||
|
||||
|
||||
class WebSocketError(Exception):
|
||||
@@ -192,6 +193,7 @@ async def websocket_upgrade(request):
|
||||
|
||||
|
||||
def websocket_wrapper(f, upgrade_function):
|
||||
@wraps(f)
|
||||
async def wrapper(request, *args, **kwargs):
|
||||
ws = await upgrade_function(request)
|
||||
try:
|
||||
|
||||
@@ -9,3 +9,4 @@ from tests.test_sse import * # noqa: F401, F403
|
||||
from tests.test_cors import * # noqa: F401, F403
|
||||
from tests.test_utemplate import * # noqa: F401, F403
|
||||
from tests.test_session import * # noqa: F401, F403
|
||||
from tests.test_auth import * # noqa: F401, F403
|
||||
|
||||
125
tests/test_auth.py
Normal file
125
tests/test_auth.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import asyncio
|
||||
import binascii
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from microdot.auth import BasicAuth, TokenAuth
|
||||
from microdot.test_client import TestClient
|
||||
|
||||
|
||||
class TestAuth(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if hasattr(asyncio, 'set_event_loop'):
|
||||
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||
cls.loop = asyncio.get_event_loop()
|
||||
|
||||
def _run(self, coro):
|
||||
return self.loop.run_until_complete(coro)
|
||||
|
||||
def test_basic_auth(self):
|
||||
app = Microdot()
|
||||
basic_auth = BasicAuth()
|
||||
|
||||
@basic_auth.authenticate
|
||||
def authenticate(request, username, password):
|
||||
if username == 'foo' and password == 'bar':
|
||||
return {'username': username}
|
||||
|
||||
@app.route('/')
|
||||
@basic_auth
|
||||
def index(request):
|
||||
return request.g.current_user['username']
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||
b'foo:bar').decode()}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'foo')
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic ' + binascii.b2a_base64(
|
||||
b'foo:baz').decode()}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
def test_token_auth(self):
|
||||
app = Microdot()
|
||||
token_auth = TokenAuth()
|
||||
|
||||
@token_auth.authenticate
|
||||
def authenticate(request, token):
|
||||
if token == 'foo':
|
||||
return 'user'
|
||||
|
||||
@app.route('/')
|
||||
@token_auth
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
def test_token_auth_custom_header(self):
|
||||
app = Microdot()
|
||||
token_auth = TokenAuth(header='X-Auth-Token')
|
||||
|
||||
@token_auth.authenticate
|
||||
def authenticate(request, token):
|
||||
if token == 'foo':
|
||||
return 'user'
|
||||
|
||||
@app.route('/')
|
||||
@token_auth
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Basic foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'Authorization': 'foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'Authorization': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={
|
||||
'X-Token-Auth': 'Bearer foo'}))
|
||||
self.assertEqual(res.status_code, 401)
|
||||
|
||||
res = self._run(client.get('/', headers={'X-Auth-Token': 'foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
res = self._run(client.get('/', headers={'x-auth-token': 'foo'}))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
|
||||
@token_auth.errorhandler
|
||||
def error_handler(request):
|
||||
return {'status_code': 403}, 403
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 403)
|
||||
self.assertEqual(res.json, {'status_code': 403})
|
||||
185
tests/test_login.py
Normal file
185
tests/test_login.py
Normal file
@@ -0,0 +1,185 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from microdot.login import Login
|
||||
from microdot.session import Session
|
||||
from microdot.test_client import TestClient
|
||||
|
||||
|
||||
class TestLogin(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if hasattr(asyncio, 'set_event_loop'):
|
||||
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||
cls.loop = asyncio.get_event_loop()
|
||||
|
||||
def _run(self, coro):
|
||||
return self.loop.run_until_complete(coro)
|
||||
|
||||
def test_login(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
class User:
|
||||
def __init__(self, id, name):
|
||||
self.id = id
|
||||
self.name = name
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return User(user_id, f'user{user_id}')
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
def index(request):
|
||||
return request.g.current_user.name
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, User(123, 'user123'))
|
||||
|
||||
@app.post('/logout')
|
||||
async def logout_route(request):
|
||||
await login.logout_user(request)
|
||||
return 'ok'
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.get('/?foo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/%3Ffoo%3Dbar')
|
||||
|
||||
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||
self.assertEqual(len(res.headers['Set-Cookie']), 1)
|
||||
self.assertIn('session', client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user123')
|
||||
|
||||
res = self._run(client.post('/logout'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
|
||||
def test_login_bad_user_id(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return None
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/foo')
|
||||
@login
|
||||
async def index(request):
|
||||
return 'ok'
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user')
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/')
|
||||
res = self._run(client.get('/foo'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/foo')
|
||||
|
||||
def test_login_bad_redirect(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return user_id
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
async def index(request):
|
||||
return 'ok'
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user')
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=http://example.com'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/')
|
||||
|
||||
def test_login_remember(self):
|
||||
app = Microdot()
|
||||
Session(app, secret_key='secret')
|
||||
login = Login()
|
||||
|
||||
@login.id_to_user
|
||||
def id_to_user(user_id):
|
||||
return user_id
|
||||
|
||||
@login.user_to_id
|
||||
def user_to_id(user):
|
||||
return user
|
||||
|
||||
@app.get('/')
|
||||
@login
|
||||
def index(request):
|
||||
return request.g.current_user
|
||||
|
||||
@app.post('/login')
|
||||
async def login_route(request):
|
||||
return await login.login_user(request, 'user', remember=True)
|
||||
|
||||
@app.post('/logout')
|
||||
async def logout(request):
|
||||
await login.logout_user(request)
|
||||
return 'ok'
|
||||
|
||||
@app.get('/fresh')
|
||||
@login.fresh
|
||||
async def fresh(request):
|
||||
return f'fresh {request.g.current_user}'
|
||||
|
||||
client = TestClient(app)
|
||||
res = self._run(client.post('/login?next=/%3Ffoo=bar'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/?foo=bar')
|
||||
self.assertEqual(len(res.headers['Set-Cookie']), 2)
|
||||
self.assertIn('session', client.cookies)
|
||||
self.assertIn('_remember', client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'user')
|
||||
res = self._run(client.get('/fresh'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.text, 'fresh user')
|
||||
|
||||
del client.cookies['session']
|
||||
print(client.cookies)
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
res = self._run(client.get('/fresh'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
self.assertEqual(res.headers['Location'], '/login?next=/fresh')
|
||||
|
||||
res = self._run(client.post('/logout'))
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertFalse('_remember' in client.cookies)
|
||||
|
||||
res = self._run(client.get('/'))
|
||||
self.assertEqual(res.status_code, 302)
|
||||
@@ -203,7 +203,8 @@ class TestResponse(unittest.TestCase):
|
||||
'foo7=bar7; Path=/foo; Domain=example.com:1234; '
|
||||
'Expires=Tue, 05 Nov 2019 02:23:54 GMT; Max-Age=123; Secure; '
|
||||
'HttpOnly',
|
||||
'foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; HttpOnly',
|
||||
('foo8=; Expires=Thu, 01 Jan 1970 00:00:01 GMT; Max-Age=0; '
|
||||
'HttpOnly'),
|
||||
]})
|
||||
|
||||
def test_redirect(self):
|
||||
|
||||
@@ -37,7 +37,7 @@ class TestSession(unittest.TestCase):
|
||||
|
||||
@app.post('/set')
|
||||
@with_session
|
||||
async def save_session(req, session):
|
||||
def save_session(req, session):
|
||||
session['name'] = 'joe'
|
||||
session.save()
|
||||
return 'OK'
|
||||
|
||||
@@ -23,6 +23,8 @@ class TestWebSocket(unittest.TestCase):
|
||||
async def handle_sse(request, sse):
|
||||
await sse.send('foo')
|
||||
await sse.send('bar', event='test')
|
||||
await sse.send('bar', event='test', event_id='id42')
|
||||
await sse.send('bar', event_id='id42')
|
||||
await sse.send({'foo': 'bar'})
|
||||
await sse.send([42, 'foo', 'bar'])
|
||||
await sse.send(ValueError('foo'))
|
||||
@@ -34,6 +36,8 @@ class TestWebSocket(unittest.TestCase):
|
||||
self.assertEqual(response.headers['Content-Type'], 'text/event-stream')
|
||||
self.assertEqual(response.text, ('data: foo\n\n'
|
||||
'event: test\ndata: bar\n\n'
|
||||
'event: test\nid: id42\ndata: bar\n\n'
|
||||
'id: id42\ndata: bar\n\n'
|
||||
'data: {"foo": "bar"}\n\n'
|
||||
'data: [42, "foo", "bar"]\n\n'
|
||||
'data: foo\n\n'
|
||||
|
||||
@@ -26,10 +26,14 @@ class TestURLPattern(unittest.TestCase):
|
||||
p = URLPattern('/<arg>')
|
||||
self.assertEqual(p.match('/foo'), {'arg': 'foo'})
|
||||
self.assertIsNone(p.match('/'))
|
||||
self.assertIsNone(p.match('//'))
|
||||
self.assertIsNone(p.match(''))
|
||||
self.assertIsNone(p.match('foo/'))
|
||||
self.assertIsNone(p.match('/foo/'))
|
||||
self.assertIsNone(p.match('//foo/'))
|
||||
self.assertIsNone(p.match('/foo//'))
|
||||
self.assertIsNone(p.match('/foo/bar'))
|
||||
self.assertIsNone(p.match('/foo//bar'))
|
||||
|
||||
p = URLPattern('/<arg>/')
|
||||
self.assertEqual(p.match('/foo/'), {'arg': 'foo'})
|
||||
@@ -64,6 +68,8 @@ class TestURLPattern(unittest.TestCase):
|
||||
def test_int_argument(self):
|
||||
p = URLPattern('/users/<int:id>')
|
||||
self.assertEqual(p.match('/users/123'), {'id': 123})
|
||||
self.assertEqual(p.match('/users/-123'), {'id': -123})
|
||||
self.assertEqual(p.match('/users/0'), {'id': 0})
|
||||
self.assertIsNone(p.match('/users/'))
|
||||
self.assertIsNone(p.match('/users/abc'))
|
||||
self.assertIsNone(p.match('/users/123abc'))
|
||||
|
||||
Reference in New Issue
Block a user