From 76ab1fa6d72dd9deaa24aeaf4895a0c6fc883bcb Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 27 Apr 2019 16:34:19 +0100 Subject: [PATCH] more unit tests --- microdot.py | 11 +-- tests/mock_socket.py | 71 +++++++++++++++++++ tests/test_microdot.py | 141 +++++++++++++++++++++++++++++++++++++- tests/test_request.py | 47 ++++--------- tests/test_response.py | 21 ++++++ tests/test_url_pattern.py | 1 - tox.ini | 1 + 7 files changed, 254 insertions(+), 39 deletions(-) create mode 100644 tests/mock_socket.py diff --git a/microdot.py b/microdot.py index bfd06c9..44554ae 100644 --- a/microdot.py +++ b/microdot.py @@ -283,7 +283,7 @@ class Microdot(): ai = socket.getaddrinfo(host, port) addr = ai[0][-1] - if debug: + if debug: # pragma: no cover print('Listening on {host}:{port}...'.format(host=host, port=port)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(addr) @@ -312,15 +312,18 @@ class Microdot(): if exc.__class__ in self.error_handlers: try: resp = self.error_handlers[exc.__class__](req, exc) - except Exception as exc2: + except Exception as exc2: # pragma: no cover print_exception(exc2) if resp is None: - resp = 'Internal server error', 500 + if 500 in self.error_handlers: + resp = self.error_handlers[500](req) + else: + resp = 'Internal server error', 500 if isinstance(resp, tuple): resp = Response(*resp) elif not isinstance(resp, Response): resp = Response(resp) - if debug: + if debug: # pragma: no cover print('{method} {path} {status_code}'.format( method=req.method, path=req.path, status_code=resp.status_code)) diff --git a/tests/mock_socket.py b/tests/mock_socket.py new file mode 100644 index 0000000..d0597a6 --- /dev/null +++ b/tests/mock_socket.py @@ -0,0 +1,71 @@ +try: + import uio as io +except ImportError: + import io + +SOL_SOCKET = 'SOL_SOCKET' +SO_REUSEADDR = 'SO_REUSEADDR' + +_calls = [] +_requests = [] + + +def getaddrinfo(host, port): + _calls.append(('getaddrinfo', host, port)) + return (('family', 'addr'), 'socktype', 'proto', 'canonname', 'sockaddr') + + +class socket: + def __init__(self): + self.request_index = 0 + + def setsockopt(self, level, optname, value): + _calls.append(('setsockopt', level, optname, value)) + + def bind(self, addr): + _calls.append(('bind', addr)) + + def listen(self, backlog): + _calls.append(('listen', backlog)) + + def accept(self): + _calls.append(('accept',)) + self.request_index += 1 + return _requests[self.request_index - 1], 'addr' + + +class FakeStream(io.BytesIO): + def __init__(self, input_data): + super().__init__(input_data) + self.response = b'' + + def write(self, data): + self.response += data + + +def get_request_fd(method, path, headers=None, body=None): + if headers is None: + headers = {} + if body is None: + body = '' + elif 'Content-Length' not in headers: + headers['Content-Length'] = str(len(body)) + request_bytes = '{method} {path} HTTP/1.0\n'.format( + method=method, path=path) + if 'Host' not in headers: + headers['Host'] = 'example.com:1234' + for header, value in headers.items(): + request_bytes += '{header}: {value}\n'.format( + header=header, value=value) + request_bytes += '\n' + body + return FakeStream(request_bytes.encode()) + + +def clear_requests(): + _requests.clear() + + +def add_request(method, path, headers=None, body=None): + fd = get_request_fd(method, path, headers=headers, body=body) + _requests.append(fd) + return fd diff --git a/tests/test_microdot.py b/tests/test_microdot.py index 8d7a0aa..ae5504d 100644 --- a/tests/test_microdot.py +++ b/tests/test_microdot.py @@ -1,6 +1,143 @@ +import sys import unittest +from microdot import Microdot +from tests import mock_socket class TestMicrodot(unittest.TestCase): - def test_foo(self): - pass + def setUp(self): + # mock socket module + self.original_socket = sys.modules['microdot'].socket + sys.modules['microdot'].socket = mock_socket + + def tearDown(self): + # restore original socket module + sys.modules['microdot'].socket = self.original_socket + + def test_get_request(self): + app = Microdot() + + @app.route('/') + def index(req): + return 'foo' + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n' + b'Content-Length: 3\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'foo') + + def test_post_request(self): + app = Microdot() + + @app.route('/') + def index(req): + return 'foo' + + @app.route('/', methods=['POST']) + def index_post(req): + return 'bar' + + mock_socket.clear_requests() + fd = mock_socket.add_request('POST', '/') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n' + b'Content-Length: 3\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'bar') + + def test_404(self): + app = Microdot() + + @app.route('/') + def index(req): + return 'foo' + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/foo') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 404 N/A\r\n' + b'Content-Length: 9\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'Not found') + + def test_404_handler(self): + app = Microdot() + + @app.route('/') + def index(req): + return 'foo' + + @app.errorhandler(404) + def handle_404(req): + return '404' + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/foo') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n' + b'Content-Length: 3\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'404') + + def test_500(self): + app = Microdot() + + @app.route('/') + def index(req): + return 1 / 0 + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 500 N/A\r\n' + b'Content-Length: 21\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'Internal server error') + + def test_500_handler(self): + app = Microdot() + + @app.route('/') + def index(req): + return 1 / 0 + + @app.errorhandler(500) + def handle_500(req): + return '501', 501 + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 501 N/A\r\n' + b'Content-Length: 3\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'501') + + def test_exception_handler(self): + app = Microdot() + + @app.route('/') + def index(req): + return 1 / 0 + + @app.errorhandler(ZeroDivisionError) + def handle_div_zero(req, exc): + return '501', 501 + + mock_socket.clear_requests() + fd = mock_socket.add_request('GET', '/') + self.assertRaises(IndexError, app.run) + self.assertEqual(fd.response, b'HTTP/1.0 501 N/A\r\n' + b'Content-Length: 3\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'501') diff --git a/tests/test_request.py b/tests/test_request.py index 7e62734..27fa61c 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,32 +1,11 @@ -try: - import uio as io -except ImportError: - import io - import unittest from microdot import Request +from tests.mock_socket import get_request_fd class TestRequest(unittest.TestCase): - def _get_request_fd(self, method, path, headers=None, body=None): - if headers is None: - headers = {} - if body is None: - body = '' - elif 'Content-Length' not in headers: - headers['Content-Length'] = str(len(body)) - request_bytes = '{method} {path} HTTP/1.0\n'.format( - method=method, path=path) - if 'Host' not in headers: - headers['Host'] = 'example.com:1234' - for header, value in headers.items(): - request_bytes += '{header}: {value}\n'.format( - header=header, value=value) - request_bytes += '\n' + body - return io.BytesIO(request_bytes.encode()) - def test_create_request(self): - fd = self._get_request_fd('GET', '/foo') + fd = get_request_fd('GET', '/foo') req = Request(fd, 'addr') req.close() self.assertEqual(req.client_sock, fd) @@ -45,7 +24,7 @@ class TestRequest(unittest.TestCase): self.assertEqual(req.form, None) def test_headers(self): - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/json', 'Cookie': 'foo=bar;abc=def', 'Content-Length': '3'}, body='aaa') @@ -61,35 +40,39 @@ class TestRequest(unittest.TestCase): self.assertEqual(req.body, b'aaa') def test_args(self): - fd = self._get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%') + fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%') req = Request(fd, 'addr') self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%') self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'}) def test_json(self): - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/json'}, body='{"foo":"bar"}') req = Request(fd, 'addr') - self.assertEqual(req.json, {'foo': 'bar'}) + json = req.json + self.assertEqual(json, {'foo': 'bar'}) + self.assertTrue(req.json is json) - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/json'}, body='[1, "2"]') req = Request(fd, 'addr') self.assertEqual(req.json, [1, '2']) - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/xml'}, body='[1, "2"]') req = Request(fd, 'addr') self.assertIsNone(req.json) def test_form(self): - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/x-www-form-urlencoded'}, body='foo=bar&abc=def&x=%2f%%') req = Request(fd, 'addr') - self.assertEqual(req.form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'}) + form = req.form + self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'}) + self.assertTrue(req.form is form) - fd = self._get_request_fd('GET', '/foo', headers={ + fd = get_request_fd('GET', '/foo', headers={ 'Content-Type': 'application/json'}, body='foo=bar&abc=def&x=%2f%%') req = Request(fd, 'addr') diff --git a/tests/test_response.py b/tests/test_response.py index 89adc6e..112d2fb 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -55,6 +55,21 @@ class TestResponse(unittest.TestCase): b'\r\n' b'foo') + def test_create_empty(self): + res = Response(headers={'X-Foo': 'Bar'}) + self.assertEqual(res.status_code, 200) + self.assertEqual(res.headers, {'X-Foo': 'Bar'}) + self.assertEqual(res.body, b'') + fd = io.BytesIO() + res.write(fd) + self.assertEqual( + fd.getvalue(), + b'HTTP/1.0 200 OK\r\n' + b'X-Foo: Bar\r\n' + b'Content-Length: 0\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n') + def test_create_json(self): res = Response({'foo': 'bar'}) self.assertEqual(res.status_code, 200) @@ -159,3 +174,9 @@ class TestResponse(unittest.TestCase): self.assertEqual(res.headers['Content-Type'], content_type) self.assertEqual(res.headers['Content-Length'], '4') self.assertEqual(res.body, b'foo\n') + res = Response.send_file('tests/files/test.txt', + content_type='text/html') + self.assertEqual(res.status_code, 200) + self.assertEqual(res.headers['Content-Type'], 'text/html') + self.assertEqual(res.headers['Content-Length'], '4') + self.assertEqual(res.body, b'foo\n') diff --git a/tests/test_url_pattern.py b/tests/test_url_pattern.py index 411ba77..d6520b5 100644 --- a/tests/test_url_pattern.py +++ b/tests/test_url_pattern.py @@ -78,7 +78,6 @@ class TestURLPattern(unittest.TestCase): def test_regex_argument(self): p = URLPattern('/users/') - print(p.pattern) self.assertEqual(p.match('/users/ab'), {'id': 'ab'}) self.assertEqual(p.match('/users/bca'), {'id': 'bca'}) self.assertIsNone(p.match('/users/abcd')) diff --git a/tox.ini b/tox.ini index 9e893ce..ec30624 100644 --- a/tox.ini +++ b/tox.ini @@ -28,3 +28,4 @@ commands=sh -c "bin/micropython run_tests.py" [testenv:upy-mac] whitelist_externals=micropython commands=micropython run_tests.py +deps=