request unit tests

This commit is contained in:
Miguel Grinberg
2019-04-27 11:58:03 +01:00
parent 9b32292f21
commit 0b95feafc9
8 changed files with 137 additions and 12 deletions

View File

@@ -10,7 +10,7 @@ except ImportError:
try:
from sys import print_exception
except ImportError:
except ImportError: # pragma: no cover
import traceback
def print_exception(exc):
@@ -43,25 +43,29 @@ class Request():
self.client_sock = client_sock
self.client_addr = client_addr
if not hasattr(client_sock, 'readline'):
if not hasattr(client_sock, 'readline'): # pragma: no cover
self.client_stream = client_sock.makefile("rwb")
else:
self.client_stream = client_sock
# request line
line = self.client_stream.readline().strip().decode('utf-8')
line = self.client_stream.readline().strip().decode()
self.method, self.path, self.http_version = line.split()
self.http_version = self.http_version.split('/', 1)[1]
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
# headers
self.headers = {}
self.cookies = {}
self.content_length = 0
self.content_type = None
while True:
line = self.client_stream.readline().strip().decode('utf-8')
line = self.client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
@@ -81,12 +85,18 @@ class Request():
self._json = None
self._form = None
def _parse_urlencoded(self, urlencoded):
return {
urldecode(key): urldecode(value) for key, value in [
pair.split('=', 1) for pair in
urlencoded.split('&')]}
@property
def json(self):
if self.content_type != 'application/json':
return None
if self._json is None:
self._json = json.loads(self.body)
self._json = json.loads(self.body.decode())
return self._json
@property
@@ -94,15 +104,12 @@ class Request():
if self.content_type != 'application/x-www-form-urlencoded':
return None
if self._form is None:
self._form = {
urldecode(key): urldecode(value) for key, value in [
pair.split('=', 1) for pair in
self.body.decode().split('&')]}
self._form = self._parse_urlencoded(self.body.decode())
return self._form
def close(self):
self.client_stream.close()
if self.client_stream != self.client_sock:
if self.client_stream != self.client_sock: # pragma: no cover
self.client_sock.close()

View File

@@ -1,3 +1,6 @@
import sys
sys.path.append('tests')
import unittest
unittest.main('tests')

View File

@@ -1 +1,4 @@
from tests.test_request import TestRequest
from tests.test_response import TestResponse
from tests.test_url_pattern import TestURLPattern
from tests.test_microdot import TestMicrodot

96
tests/test_request.py Normal file
View File

@@ -0,0 +1,96 @@
try:
import uio as io
except ImportError:
import io
import unittest
from microdot import Request
class TestRequest(unittest.TestCase):
def _get_request_fd(self, method, path, headers=None, body=None):
if headers is None:
headers = {}
if body is None:
body = ''
elif 'Content-Length' not in headers:
headers['Content-Length'] = str(len(body))
request_bytes = '{method} {path} HTTP/1.0\n'.format(
method=method, path=path)
if 'Host' not in headers:
headers['Host'] = 'example.com:1234'
for header, value in headers.items():
request_bytes += '{header}: {value}\n'.format(
header=header, value=value)
request_bytes += '\n' + body
return io.BytesIO(request_bytes.encode())
def test_create_request(self):
fd = self._get_request_fd('GET', '/foo')
req = Request(fd, 'addr')
req.close()
self.assertEqual(req.client_sock, fd)
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
self.assertEqual(req.http_version, '1.0')
self.assertIsNone(req.query_string)
self.assertEqual(req.args, {})
self.assertEqual(req.headers, {'Host': 'example.com:1234'})
self.assertEqual(req.cookies, {})
self.assertEqual(req.content_length, 0)
self.assertEqual(req.content_type, None)
self.assertEqual(req.body, b'')
self.assertEqual(req.json, None)
self.assertEqual(req.form, None)
def test_headers(self):
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = Request(fd, 'addr')
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'})
self.assertEqual(req.content_type, 'application/json')
self.assertEqual(req.cookies, {'foo': 'bar', 'abc': 'def'})
self.assertEqual(req.content_length, 3)
self.assertEqual(req.body, b'aaa')
def test_args(self):
fd = self._get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
def test_json(self):
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = Request(fd, 'addr')
self.assertEqual(req.json, {'foo': 'bar'})
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = Request(fd, 'addr')
self.assertEqual(req.json, [1, '2'])
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = Request(fd, 'addr')
self.assertIsNone(req.json)
def test_form(self):
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
self.assertEqual(req.form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
fd = self._get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
self.assertIsNone(req.form)

6
tests/test_response.py Normal file
View File

@@ -0,0 +1,6 @@
import unittest
class TestResponse(unittest.TestCase):
def test_foo(self):
pass

View File

@@ -0,0 +1,6 @@
import unittest
class TestURLPattern(unittest.TestCase):
def test_foo(self):
pass

View File

@@ -3,7 +3,11 @@ envlist=flake8,py35,py36,py37,upy
skip_missing_interpreters=True
[testenv]
commands=python run_tests.py
commands=
coverage run --branch --include="microdot.py" -m unittest tests
coverage report --show-missing
coverage erase
deps=coverage
basepython=
flake8: python3.7
py35: python3.5
@@ -15,7 +19,7 @@ basepython=
deps=
flake8
commands=
flake8 microdot.py tests
flake8 --exclude=tests/unittest.py microdot.py tests
[testenv:upy]
whitelist_externals=sh