diff --git a/microdot.py b/microdot.py index bdb8245..cf05284 100644 --- a/microdot.py +++ b/microdot.py @@ -10,7 +10,7 @@ except ImportError: try: from sys import print_exception -except ImportError: +except ImportError: # pragma: no cover import traceback def print_exception(exc): @@ -43,25 +43,29 @@ class Request(): self.client_sock = client_sock self.client_addr = client_addr - if not hasattr(client_sock, 'readline'): + if not hasattr(client_sock, 'readline'): # pragma: no cover self.client_stream = client_sock.makefile("rwb") else: self.client_stream = client_sock # request line - line = self.client_stream.readline().strip().decode('utf-8') + line = self.client_stream.readline().strip().decode() self.method, self.path, self.http_version = line.split() + self.http_version = self.http_version.split('/', 1)[1] if '?' in self.path: self.path, self.query_string = self.path.split('?', 1) + self.args = self._parse_urlencoded(self.query_string) else: self.query_string = None + self.args = {} # headers self.headers = {} self.cookies = {} self.content_length = 0 + self.content_type = None while True: - line = self.client_stream.readline().strip().decode('utf-8') + line = self.client_stream.readline().strip().decode() if line == '': break header, value = line.split(':', 1) @@ -81,12 +85,18 @@ class Request(): self._json = None self._form = None + def _parse_urlencoded(self, urlencoded): + return { + urldecode(key): urldecode(value) for key, value in [ + pair.split('=', 1) for pair in + urlencoded.split('&')]} + @property def json(self): if self.content_type != 'application/json': return None if self._json is None: - self._json = json.loads(self.body) + self._json = json.loads(self.body.decode()) return self._json @property @@ -94,15 +104,12 @@ class Request(): if self.content_type != 'application/x-www-form-urlencoded': return None if self._form is None: - self._form = { - urldecode(key): urldecode(value) for key, value in [ - pair.split('=', 1) for pair in - self.body.decode().split('&')]} + self._form = self._parse_urlencoded(self.body.decode()) return self._form def close(self): self.client_stream.close() - if self.client_stream != self.client_sock: + if self.client_stream != self.client_sock: # pragma: no cover self.client_sock.close() diff --git a/run_tests.py b/run_tests.py index b8b8e5b..f1e21ae 100644 --- a/run_tests.py +++ b/run_tests.py @@ -1,3 +1,6 @@ +import sys +sys.path.append('tests') + import unittest unittest.main('tests') diff --git a/tests/__init__.py b/tests/__init__.py index 74d5e5a..f953a6f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +1,4 @@ +from tests.test_request import TestRequest +from tests.test_response import TestResponse +from tests.test_url_pattern import TestURLPattern from tests.test_microdot import TestMicrodot diff --git a/tests/test_request.py b/tests/test_request.py new file mode 100644 index 0000000..7e62734 --- /dev/null +++ b/tests/test_request.py @@ -0,0 +1,96 @@ +try: + import uio as io +except ImportError: + import io + +import unittest +from microdot import Request + + +class TestRequest(unittest.TestCase): + def _get_request_fd(self, method, path, headers=None, body=None): + if headers is None: + headers = {} + if body is None: + body = '' + elif 'Content-Length' not in headers: + headers['Content-Length'] = str(len(body)) + request_bytes = '{method} {path} HTTP/1.0\n'.format( + method=method, path=path) + if 'Host' not in headers: + headers['Host'] = 'example.com:1234' + for header, value in headers.items(): + request_bytes += '{header}: {value}\n'.format( + header=header, value=value) + request_bytes += '\n' + body + return io.BytesIO(request_bytes.encode()) + + def test_create_request(self): + fd = self._get_request_fd('GET', '/foo') + req = Request(fd, 'addr') + req.close() + self.assertEqual(req.client_sock, fd) + self.assertEqual(req.client_addr, 'addr') + self.assertEqual(req.method, 'GET') + self.assertEqual(req.path, '/foo') + self.assertEqual(req.http_version, '1.0') + self.assertIsNone(req.query_string) + self.assertEqual(req.args, {}) + self.assertEqual(req.headers, {'Host': 'example.com:1234'}) + self.assertEqual(req.cookies, {}) + self.assertEqual(req.content_length, 0) + self.assertEqual(req.content_type, None) + self.assertEqual(req.body, b'') + self.assertEqual(req.json, None) + self.assertEqual(req.form, None) + + def test_headers(self): + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/json', + 'Cookie': 'foo=bar;abc=def', + 'Content-Length': '3'}, body='aaa') + req = Request(fd, 'addr') + self.assertEqual(req.headers, { + 'Host': 'example.com:1234', + 'Content-Type': 'application/json', + 'Cookie': 'foo=bar;abc=def', + 'Content-Length': '3'}) + self.assertEqual(req.content_type, 'application/json') + self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'}) + self.assertEqual(req.content_length, 3) + self.assertEqual(req.body, b'aaa') + + def test_args(self): + fd = self._get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%') + req = Request(fd, 'addr') + self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%') + self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'}) + + def test_json(self): + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/json'}, body='{"foo":"bar"}') + req = Request(fd, 'addr') + self.assertEqual(req.json, {'foo': 'bar'}) + + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/json'}, body='[1, "2"]') + req = Request(fd, 'addr') + self.assertEqual(req.json, [1, '2']) + + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/xml'}, body='[1, "2"]') + req = Request(fd, 'addr') + self.assertIsNone(req.json) + + def test_form(self): + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/x-www-form-urlencoded'}, + body='foo=bar&abc=def&x=%2f%%') + req = Request(fd, 'addr') + self.assertEqual(req.form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'}) + + fd = self._get_request_fd('GET', '/foo', headers={ + 'Content-Type': 'application/json'}, + body='foo=bar&abc=def&x=%2f%%') + req = Request(fd, 'addr') + self.assertIsNone(req.form) diff --git a/tests/test_response.py b/tests/test_response.py new file mode 100644 index 0000000..16b8780 --- /dev/null +++ b/tests/test_response.py @@ -0,0 +1,6 @@ +import unittest + + +class TestResponse(unittest.TestCase): + def test_foo(self): + pass diff --git a/tests/test_url_pattern.py b/tests/test_url_pattern.py new file mode 100644 index 0000000..564d992 --- /dev/null +++ b/tests/test_url_pattern.py @@ -0,0 +1,6 @@ +import unittest + + +class TestURLPattern(unittest.TestCase): + def test_foo(self): + pass diff --git a/unittest.py b/tests/unittest.py similarity index 100% rename from unittest.py rename to tests/unittest.py diff --git a/tox.ini b/tox.ini index 22dff1b..ed4b879 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,11 @@ envlist=flake8,py35,py36,py37,upy skip_missing_interpreters=True [testenv] -commands=python run_tests.py +commands= + coverage run --branch --include="microdot.py" -m unittest tests + coverage report --show-missing + coverage erase +deps=coverage basepython= flake8: python3.7 py35: python3.5 @@ -15,7 +19,7 @@ basepython= deps= flake8 commands= - flake8 microdot.py tests + flake8 --exclude=tests/unittest.py microdot.py tests [testenv:upy] whitelist_externals=sh