diff --git a/docs/api.rst b/docs/api.rst index e4d09ff..0b71162 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -74,6 +74,12 @@ Cross-Origin Resource Sharing (CORS) .. automodule:: microdot.cors :members: +Cross-Site Request Forgery (CSRF) Protection +-------------------------------------------- + +.. automodule:: microdot.csrf + :members: + Test Client ----------- diff --git a/docs/extensions.rst b/docs/extensions.rst index 1efe678..53fb57a 100644 --- a/docs/extensions.rst +++ b/docs/extensions.rst @@ -631,6 +631,101 @@ Example:: cors = CORS(app, allowed_origins=['https://example.com'], allow_credentials=True) +Cross-Site Request Forgery (CSRF) Protection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. list-table:: + :align: left + + * - Compatibility + - | CPython & MicroPython + + * - Required Microdot source files + - | `csrf.py `_ + + * - Required external dependencies + - | None + + * - Examples + - | `app.py `_ + +The CSRF extension provides protection against `Cross-Site Request Forgery +(CSRF) `_ attacks. This +protection defends against attackers attempting to submit forms or other +state-changing requests from their own site on behalf of unsuspecting victims, +while taking advantage of the victims previously established sessions or +cookies to impersonate them. + +This extension checks the ``Sec-Fetch-Site`` header sent by all modern web +browsers to achieve this protection. As a fallback mechanism for older browsers +that do not support this header, this extension can be linked to the CORS +extension to validate the ``Origin`` header. If you are interested in the +details of this protection mechanism, it is described in the +`OWASP CSRF Prevention Cheat Sheet `_ +page. + +.. note:: + As of December 2025, OWASP considers the use of Fetch Metadata Headers for + CSRF protection a + `defense in depth `_ + technique that is insufficient on its own. + + There is an interesting + `discussion `_ on + this topic in the OWASP GitHub repository where it appears to be agreement + that this technique provides complete protection for the vast majority of + use cases. If you are unsure if this method works for your use case, please + read this discussion to have more context and make the right decision. + +To enable CSRF protection, create an instance of the +:class:`CSRF ` class and configure the desired options. +Example:: + + from microdot import Microdot + from microdot.cors import CORS + from microdot.csrf import CSRF + + app = Microdot() + cors = CORS(app, allowed_origins=['https://example.com']) + csrf = CSRF(app, cors) + +This will protect all routes that use a state-changing method (``POST``, +``PUT``, ``PATCH`` or ``DELETE``) and will return a 403 status code response to +any requests that fail the CSRF check. + +If there are routes that need to be exempted from the CSRF check, they can be +decorated with the :meth:`csrf.exempt ` decorator:: + + @app.post('/webhook') + @csrf.exempt + async def webhook(request): + # ... + +For some applications it may be more convenient to have CSRF checks turned off +by default, and only apply them to explicitly selected routes. In this case, +pass ``protect_all=False`` when you construct the ``CSRF`` instance and use the +:meth:`csrf.protect ` decorator:: + + csrf = CSRF(app, cors, protect_all=False) + + @app.post('/submit-form') + @csrf.protect + async def submit_form(request): + # ... + +By default, requests coming from different subdomains are considered to be +cross-site, and as such they will not pass the CSRF check. If you'd like +subdomain requests to be considered safe, then set the +``allow_subdomains=True`` option when you create the ``CSRF`` class. + +.. note:: + This extension is designed to block requests issued by web browsers when + they are found to be unsafe or unauthorized by the application owner. The + method used to determine if a request should be allowed or not is based on + the value of headers that are only sent by web browsers. Clients other than + web browsers are not affected by this extension and can send requests + freely. + Test Client ~~~~~~~~~~~ diff --git a/examples/csrf/README.md b/examples/csrf/README.md new file mode 100644 index 0000000..c212611 --- /dev/null +++ b/examples/csrf/README.md @@ -0,0 +1,41 @@ +# CSRF Example + +This is a small example that demonstrates how the CSRF protection in Microdot +works. + +## Running the example + +Start by cloning the repostory or copying the two example files *app.py* and +*evil.py* to your computer. The only dependency these examples need to run is `microdot`, so create a virtual environment and run: + + pip install microdot + +You need two terminals. On the first one, run: + + python app.py + +To see the application open *http://localhost:5000* on your web browser. The +application allows you to make payments through a web form. Each payment that +you make reduces the balance in your account. Type an amount in the form field and press the "Issue Payment" button to see how the balance decreases. + +Leave the application running. On the second terminal run: + + python evil.py + +Open a second browser tab and navigate to *http://localhost:5001*. This +application simulates a malicious web site that tries to steal money from your +account. It does this by sending a cross-site form submission to the above +application. + +The application presents a form that fools you into thinking you can win some +money. Clicking the button triggers the cross-site request to the form in the +first application, with the payment amount set to $100. + +Because the application has CSRF protection enabled, the cross-site request +fails. + +If you want to see how the attack can succeed, open *app.py* in your editor and +comment out the line that creates the ``csrf`` object. Restart *app.py* in your +first terminal, then go back to the second browser tab and click the +"Win $100!" button again. You will now see that the form is submitted +successfully and your balance in the first application is decremented by $100. diff --git a/examples/csrf/app.py b/examples/csrf/app.py new file mode 100644 index 0000000..36f2e10 --- /dev/null +++ b/examples/csrf/app.py @@ -0,0 +1,40 @@ +from microdot import Microdot, redirect +from microdot.cors import CORS +from microdot.csrf import CSRF + +app = Microdot() +cors = CORS(app, allowed_origins=['http://localhost:5000']) +csrf = CSRF(app, cors) + +balance = 1000 + + +@app.route('/', methods=['GET', 'POST']) +def index(request): + global balance + if request.method == 'POST': + try: + balance -= float(request.form['amount']) + except ValueError: + pass + return redirect('/') + + page = f''' + + + CSRF Example + + +

CSRF Example

+

You have ${balance:.02f}

+
+ Pay $ + +
+ +''' + return page, {'Content-Type': 'text/html'} + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/examples/csrf/evil.py b/examples/csrf/evil.py new file mode 100644 index 0000000..7911fe6 --- /dev/null +++ b/examples/csrf/evil.py @@ -0,0 +1,25 @@ +from microdot import Microdot + +app = Microdot() + + +@app.route('/', methods=['GET', 'POST']) +def index(request): + page = ''' + + + CSRF Example + + +

Evil Site

+
+ + +
+ +''' + return page, {'Content-Type': 'text/html'} + + +if __name__ == '__main__': + app.run(port=5001, debug=True) diff --git a/src/microdot/csrf.py b/src/microdot/csrf.py new file mode 100644 index 0000000..dcd2ece --- /dev/null +++ b/src/microdot/csrf.py @@ -0,0 +1,122 @@ +from microdot import abort + + +class CSRF: + """CSRF protection for Microdot routes. + + :param app: The application instance. + :param cors: The ``CORS`` instance that defines the origins that are + trusted by the application. This is used to validate requests + from older browsers that do not send the ``Sec-Fetch-Site`` + header. + :param protect_all: If ``True``, all state changing routes are protected by + default, with the exception of routes that are + decorated with the :meth:`exempt ` decorator. + If ``False``, only routes decorated with the + :meth:`protect ` decorator are protected. The + default is ``True``. + :param allow_subdomains: If ``True``, requests from subdomains of the + application domain are trusted. The default is + ``False``. + + CSRF protection is implemented by checking the ``Sec-Fetch-Site`` sent by + browsers. When the ``cors`` argument is provided, requests from older + browsers that do not support the ``Sec-Fetch-Site`` header are validated + by checking the ``Origin`` header. + """ + SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS'] + + def __init__(self, app=None, cors=None, protect_all=True, + allow_subdomains=False): + self.cors = None + self.protect_all = protect_all + self.allow_subdomains = allow_subdomains + self.exempt_routes = [] + self.protected_routes = [] + if app is not None: + self.initialize(app, cors) + + def initialize(self, app, cors=None): + """Initialize the CSRF class. + + :param app: The application instance. + :param cors: The ``CORS`` instance that defines the origins that are + trusted by the application. This is used to validate + requests from older browsers that do not send the + ``Sec-Fetch-Site`` header. + """ + self.cors = cors + + @app.before_request + async def csrf_before_request(request): + if ( + self.protect_all + and request.method not in self.SAFE_METHODS + and request.route not in self.exempt_routes + ) or request.route in self.protected_routes: + allow = False + sfs = request.headers.get('Sec-Fetch-Site') + if sfs: + # if the Sec-Fetch-Site header was given, ensure it is not + # cross-site + if sfs in ['same-origin', 'none']: + allow = True + elif sfs == 'same-site' and self.allow_subdomains: + allow = True + elif self.cors and self.cors.allowed_origins != '*': + # if there is no Sec-Fetch-Site header but we have a list + # of allowed origins, then we can validate the origin + origin = request.headers.get('Origin') + if origin is None: + # origin wasn't given so this isn't a browser + allow = True + elif not self.allow_subdomains: + allow = origin in self.cors.allowed_origins + else: + origin_scheme, origin_host = origin.split('://', 1) + for allowed_origin in self.cors.allowed_origins: + allowed_scheme, allowed_host = \ + allowed_origin.split('://', 1) + if origin == allowed_origin or ( + origin_host.endswith('.' + allowed_host) + and origin_scheme == allowed_scheme + ): + allow = True + break + else: + allow = True # no headers to check + + if not allow: + abort(403, 'Forbidden') + + def exempt(self, f): + """Decorator to exempt a route from CSRF protection. + + This decorator must be added immediately after the route decorator to + disable CSRF protection on the route. Example:: + + @app.post('/submit') + @csrf.exempt + # add additional decorators here + def submit(request): + # ... + """ + self.exempt_routes.append(f) + return f + + def protect(self, f): + """Decorator to protect a route against CSRF attacks. + + This is useful when it is necessary to protect a request that uses one + of the safe methods that are not supposed to make state changes. The + decorator must be added immediately after the route decorator to + disable CSRF protection on the route. Example:: + + @app.get('/data') + @csrf.force + # add additional decorators here + def get_data(request): + # ... + """ + self.protected_routes.append(f) + return f diff --git a/src/microdot/microdot.py b/src/microdot/microdot.py index 727fecc..1ffcff6 100644 --- a/src/microdot/microdot.py +++ b/src/microdot/microdot.py @@ -321,7 +321,7 @@ class Request: def __init__(self, app, client_addr, method, url, http_version, headers, body=None, stream=None, sock=None, url_prefix='', - subapp=None, scheme=None): + subapp=None, scheme=None, route=None): #: The application instance to which this request belongs. self.app = app #: The address of the client, as a tuple (host, port). @@ -339,6 +339,8 @@ class Request: #: The sub-application instance, or `None` if this isn't a mounted #: endpoint. self.subapp = subapp + #: The route function that handles this request. + self.route = route #: The path portion of the URL. self.path = url #: The query string portion of the URL. @@ -1429,6 +1431,8 @@ class Microdot: try: res = None if callable(f): + req.route = f + # invoke the before request handlers for handler in self.get_request_handlers( req, 'before_request', False): diff --git a/src/microdot/session.py b/src/microdot/session.py index 07de334..041b290 100644 --- a/src/microdot/session.py +++ b/src/microdot/session.py @@ -24,6 +24,7 @@ class SessionDict(dict): class Session: """Session handling + :param app: The application instance. :param secret_key: The secret key, as a string or bytes object. :param cookie_options: A dictionary with cookie options to pass as diff --git a/tests/__init__.py b/tests/__init__.py index f1ead3f..2fde6bf 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,3 +12,4 @@ from tests.test_utemplate import * # noqa: F401, F403 from tests.test_session import * # noqa: F401, F403 from tests.test_auth import * # noqa: F401, F403 from tests.test_login import * # noqa: F401, F403 +from tests.test_csrf import * # noqa: F401, F403 diff --git a/tests/test_csrf.py b/tests/test_csrf.py new file mode 100644 index 0000000..d3f6238 --- /dev/null +++ b/tests/test_csrf.py @@ -0,0 +1,298 @@ +import asyncio +import unittest +from microdot import Microdot +from microdot.cors import CORS +from microdot.csrf import CSRF +from microdot.test_client import TestClient + + +class TestCSRF(unittest.TestCase): + @classmethod + def setUpClass(cls): + if hasattr(asyncio, 'set_event_loop'): + asyncio.set_event_loop(asyncio.new_event_loop()) + cls.loop = asyncio.get_event_loop() + + def _run(self, coro): + return self.loop.run_until_complete(coro) + + def test_protect_all_true(self): + app = Microdot() + csrf = CSRF(app) + + @app.get('/') + def index(request): + return 204 + + @app.post('/submit') + def submit(request): + return 204 + + @app.post('/submit-exempt') + @csrf.exempt + def submit_exempt(request): + return 204 + + @app.get('/get-protected') + @csrf.protect + def get_protected(request): + return 204 + + client = TestClient(app) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'https://evil.com'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-site'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-origin'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit-exempt')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit-exempt', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get('/get-protected')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/get-protected', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 403) + + def test_protect_all_false(self): + app = Microdot() + csrf = CSRF(protect_all=False) + csrf.initialize(app) + + @app.get('/') + def index(request): + return 204 + + @app.post('/submit') + @csrf.protect + def submit(request): + return 204 + + @app.post('/submit-exempt') + def submit_exempt(request): + return 204 + + client = TestClient(app) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'https://evil.com'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-site'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-origin'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit-exempt')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit-exempt', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + + def test_allow_subdomains(self): + app = Microdot() + csrf = CSRF(allow_subdomains=True) + csrf.initialize(app) + + @app.get('/') + def index(request): + return 204 + + @app.post('/submit') + def submit(request): + return 204 + + @app.post('/submit-exempt') + @csrf.exempt + def submit_exempt(request): + return 204 + + client = TestClient(app) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'https://evil.com'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-site'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Sec-Fetch-Site': 'same-origin'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit-exempt')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit-exempt', headers={'Sec-Fetch-Site': 'cross-site'} + )) + self.assertEqual(res.status_code, 204) + + def test_allowed_origins(self): + app = Microdot() + cors = CORS(allowed_origins=['http://foo.com', 'https://bar.com:8888']) + csrf = CSRF() + csrf.initialize(app, cors) + + @app.get('/') + def index(request): + return 204 + + @app.post('/submit') + def submit(request): + return 204 + + client = TestClient(app) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'foo.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'http://foo.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'https://baz.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'http://x.baz.com'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Origin': 'https://bar.com:8888'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Origin': 'http://bar.com:8888'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Origin': 'https://x.y.bar.com:8888'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Origin': 'http://baz.com'} + )) + self.assertEqual(res.status_code, 403) + + def test_allowed_origins_with_subdomains(self): + app = Microdot() + cors = CORS(allowed_origins=['http://foo.com', 'https://bar.com:8888']) + csrf = CSRF(allow_subdomains=True) + csrf.initialize(app, cors) + + @app.get('/') + def index(request): + return 204 + + @app.post('/submit') + def submit(request): + return 204 + + client = TestClient(app) + + res = self._run(client.get('/')) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'foo.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'http://foo.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'https://baz.com'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.get( + '/', headers={'Origin': 'http://x.baz.com'} + )) + self.assertEqual(res.status_code, 204) + + res = self._run(client.post('/submit')) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Origin': 'https://bar.com:8888'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Origin': 'http://bar.com:8888'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Origin': 'https://x.y.bar.com:8888'} + )) + self.assertEqual(res.status_code, 204) + res = self._run(client.post( + '/submit', headers={'Origin': 'http://x.y.bar.com:8888'} + )) + self.assertEqual(res.status_code, 403) + res = self._run(client.post( + '/submit', headers={'Origin': 'http://baz.com'} + )) + self.assertEqual(res.status_code, 403)