more unit tests
This commit is contained in:
11
microdot.py
11
microdot.py
@@ -283,7 +283,7 @@ class Microdot():
|
||||
ai = socket.getaddrinfo(host, port)
|
||||
addr = ai[0][-1]
|
||||
|
||||
if debug:
|
||||
if debug: # pragma: no cover
|
||||
print('Listening on {host}:{port}...'.format(host=host, port=port))
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(addr)
|
||||
@@ -312,15 +312,18 @@ class Microdot():
|
||||
if exc.__class__ in self.error_handlers:
|
||||
try:
|
||||
resp = self.error_handlers[exc.__class__](req, exc)
|
||||
except Exception as exc2:
|
||||
except Exception as exc2: # pragma: no cover
|
||||
print_exception(exc2)
|
||||
if resp is None:
|
||||
resp = 'Internal server error', 500
|
||||
if 500 in self.error_handlers:
|
||||
resp = self.error_handlers[500](req)
|
||||
else:
|
||||
resp = 'Internal server error', 500
|
||||
if isinstance(resp, tuple):
|
||||
resp = Response(*resp)
|
||||
elif not isinstance(resp, Response):
|
||||
resp = Response(resp)
|
||||
if debug:
|
||||
if debug: # pragma: no cover
|
||||
print('{method} {path} {status_code}'.format(
|
||||
method=req.method, path=req.path,
|
||||
status_code=resp.status_code))
|
||||
|
||||
71
tests/mock_socket.py
Normal file
71
tests/mock_socket.py
Normal file
@@ -0,0 +1,71 @@
|
||||
try:
|
||||
import uio as io
|
||||
except ImportError:
|
||||
import io
|
||||
|
||||
SOL_SOCKET = 'SOL_SOCKET'
|
||||
SO_REUSEADDR = 'SO_REUSEADDR'
|
||||
|
||||
_calls = []
|
||||
_requests = []
|
||||
|
||||
|
||||
def getaddrinfo(host, port):
|
||||
_calls.append(('getaddrinfo', host, port))
|
||||
return (('family', 'addr'), 'socktype', 'proto', 'canonname', 'sockaddr')
|
||||
|
||||
|
||||
class socket:
|
||||
def __init__(self):
|
||||
self.request_index = 0
|
||||
|
||||
def setsockopt(self, level, optname, value):
|
||||
_calls.append(('setsockopt', level, optname, value))
|
||||
|
||||
def bind(self, addr):
|
||||
_calls.append(('bind', addr))
|
||||
|
||||
def listen(self, backlog):
|
||||
_calls.append(('listen', backlog))
|
||||
|
||||
def accept(self):
|
||||
_calls.append(('accept',))
|
||||
self.request_index += 1
|
||||
return _requests[self.request_index - 1], 'addr'
|
||||
|
||||
|
||||
class FakeStream(io.BytesIO):
|
||||
def __init__(self, input_data):
|
||||
super().__init__(input_data)
|
||||
self.response = b''
|
||||
|
||||
def write(self, data):
|
||||
self.response += data
|
||||
|
||||
|
||||
def get_request_fd(method, path, headers=None, body=None):
|
||||
if headers is None:
|
||||
headers = {}
|
||||
if body is None:
|
||||
body = ''
|
||||
elif 'Content-Length' not in headers:
|
||||
headers['Content-Length'] = str(len(body))
|
||||
request_bytes = '{method} {path} HTTP/1.0\n'.format(
|
||||
method=method, path=path)
|
||||
if 'Host' not in headers:
|
||||
headers['Host'] = 'example.com:1234'
|
||||
for header, value in headers.items():
|
||||
request_bytes += '{header}: {value}\n'.format(
|
||||
header=header, value=value)
|
||||
request_bytes += '\n' + body
|
||||
return FakeStream(request_bytes.encode())
|
||||
|
||||
|
||||
def clear_requests():
|
||||
_requests.clear()
|
||||
|
||||
|
||||
def add_request(method, path, headers=None, body=None):
|
||||
fd = get_request_fd(method, path, headers=headers, body=body)
|
||||
_requests.append(fd)
|
||||
return fd
|
||||
@@ -1,6 +1,143 @@
|
||||
import sys
|
||||
import unittest
|
||||
from microdot import Microdot
|
||||
from tests import mock_socket
|
||||
|
||||
|
||||
class TestMicrodot(unittest.TestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
def setUp(self):
|
||||
# mock socket module
|
||||
self.original_socket = sys.modules['microdot'].socket
|
||||
sys.modules['microdot'].socket = mock_socket
|
||||
|
||||
def tearDown(self):
|
||||
# restore original socket module
|
||||
sys.modules['microdot'].socket = self.original_socket
|
||||
|
||||
def test_get_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n'
|
||||
b'Content-Length: 3\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'foo')
|
||||
|
||||
def test_post_request(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def index_post(req):
|
||||
return 'bar'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('POST', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n'
|
||||
b'Content-Length: 3\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'bar')
|
||||
|
||||
def test_404(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 404 N/A\r\n'
|
||||
b'Content-Length: 9\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'Not found')
|
||||
|
||||
def test_404_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 'foo'
|
||||
|
||||
@app.errorhandler(404)
|
||||
def handle_404(req):
|
||||
return '404'
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/foo')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 200 OK\r\n'
|
||||
b'Content-Length: 3\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'404')
|
||||
|
||||
def test_500(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 500 N/A\r\n'
|
||||
b'Content-Length: 21\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'Internal server error')
|
||||
|
||||
def test_500_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(500)
|
||||
def handle_500(req):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 501 N/A\r\n'
|
||||
b'Content-Length: 3\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'501')
|
||||
|
||||
def test_exception_handler(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
def index(req):
|
||||
return 1 / 0
|
||||
|
||||
@app.errorhandler(ZeroDivisionError)
|
||||
def handle_div_zero(req, exc):
|
||||
return '501', 501
|
||||
|
||||
mock_socket.clear_requests()
|
||||
fd = mock_socket.add_request('GET', '/')
|
||||
self.assertRaises(IndexError, app.run)
|
||||
self.assertEqual(fd.response, b'HTTP/1.0 501 N/A\r\n'
|
||||
b'Content-Length: 3\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n'
|
||||
b'501')
|
||||
|
||||
@@ -1,32 +1,11 @@
|
||||
try:
|
||||
import uio as io
|
||||
except ImportError:
|
||||
import io
|
||||
|
||||
import unittest
|
||||
from microdot import Request
|
||||
from tests.mock_socket import get_request_fd
|
||||
|
||||
|
||||
class TestRequest(unittest.TestCase):
|
||||
def _get_request_fd(self, method, path, headers=None, body=None):
|
||||
if headers is None:
|
||||
headers = {}
|
||||
if body is None:
|
||||
body = ''
|
||||
elif 'Content-Length' not in headers:
|
||||
headers['Content-Length'] = str(len(body))
|
||||
request_bytes = '{method} {path} HTTP/1.0\n'.format(
|
||||
method=method, path=path)
|
||||
if 'Host' not in headers:
|
||||
headers['Host'] = 'example.com:1234'
|
||||
for header, value in headers.items():
|
||||
request_bytes += '{header}: {value}\n'.format(
|
||||
header=header, value=value)
|
||||
request_bytes += '\n' + body
|
||||
return io.BytesIO(request_bytes.encode())
|
||||
|
||||
def test_create_request(self):
|
||||
fd = self._get_request_fd('GET', '/foo')
|
||||
fd = get_request_fd('GET', '/foo')
|
||||
req = Request(fd, 'addr')
|
||||
req.close()
|
||||
self.assertEqual(req.client_sock, fd)
|
||||
@@ -45,7 +24,7 @@ class TestRequest(unittest.TestCase):
|
||||
self.assertEqual(req.form, None)
|
||||
|
||||
def test_headers(self):
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json',
|
||||
'Cookie': 'foo=bar;abc=def',
|
||||
'Content-Length': '3'}, body='aaa')
|
||||
@@ -61,35 +40,39 @@ class TestRequest(unittest.TestCase):
|
||||
self.assertEqual(req.body, b'aaa')
|
||||
|
||||
def test_args(self):
|
||||
fd = self._get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
req = Request(fd, 'addr')
|
||||
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
|
||||
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
|
||||
def test_json(self):
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
|
||||
req = Request(fd, 'addr')
|
||||
self.assertEqual(req.json, {'foo': 'bar'})
|
||||
json = req.json
|
||||
self.assertEqual(json, {'foo': 'bar'})
|
||||
self.assertTrue(req.json is json)
|
||||
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'}, body='[1, "2"]')
|
||||
req = Request(fd, 'addr')
|
||||
self.assertEqual(req.json, [1, '2'])
|
||||
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/xml'}, body='[1, "2"]')
|
||||
req = Request(fd, 'addr')
|
||||
self.assertIsNone(req.json)
|
||||
|
||||
def test_form(self):
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = Request(fd, 'addr')
|
||||
self.assertEqual(req.form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
form = req.form
|
||||
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertTrue(req.form is form)
|
||||
|
||||
fd = self._get_request_fd('GET', '/foo', headers={
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
'Content-Type': 'application/json'},
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = Request(fd, 'addr')
|
||||
|
||||
@@ -55,6 +55,21 @@ class TestResponse(unittest.TestCase):
|
||||
b'\r\n'
|
||||
b'foo')
|
||||
|
||||
def test_create_empty(self):
|
||||
res = Response(headers={'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers, {'X-Foo': 'Bar'})
|
||||
self.assertEqual(res.body, b'')
|
||||
fd = io.BytesIO()
|
||||
res.write(fd)
|
||||
self.assertEqual(
|
||||
fd.getvalue(),
|
||||
b'HTTP/1.0 200 OK\r\n'
|
||||
b'X-Foo: Bar\r\n'
|
||||
b'Content-Length: 0\r\n'
|
||||
b'Content-Type: text/plain\r\n'
|
||||
b'\r\n')
|
||||
|
||||
def test_create_json(self):
|
||||
res = Response({'foo': 'bar'})
|
||||
self.assertEqual(res.status_code, 200)
|
||||
@@ -159,3 +174,9 @@ class TestResponse(unittest.TestCase):
|
||||
self.assertEqual(res.headers['Content-Type'], content_type)
|
||||
self.assertEqual(res.headers['Content-Length'], '4')
|
||||
self.assertEqual(res.body, b'foo\n')
|
||||
res = Response.send_file('tests/files/test.txt',
|
||||
content_type='text/html')
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertEqual(res.headers['Content-Type'], 'text/html')
|
||||
self.assertEqual(res.headers['Content-Length'], '4')
|
||||
self.assertEqual(res.body, b'foo\n')
|
||||
|
||||
@@ -78,7 +78,6 @@ class TestURLPattern(unittest.TestCase):
|
||||
|
||||
def test_regex_argument(self):
|
||||
p = URLPattern('/users/<re:[a-c]+:id>')
|
||||
print(p.pattern)
|
||||
self.assertEqual(p.match('/users/ab'), {'id': 'ab'})
|
||||
self.assertEqual(p.match('/users/bca'), {'id': 'bca'})
|
||||
self.assertIsNone(p.match('/users/abcd'))
|
||||
|
||||
Reference in New Issue
Block a user