Support duplicate arguments in query string and form submissions
Fixes #21
This commit is contained in:
@@ -77,6 +77,38 @@ def urldecode(string):
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
class MultiDict(dict):
|
||||
def __init__(self, initial_dict=None):
|
||||
super().__init__()
|
||||
if initial_dict:
|
||||
for key, value in initial_dict.items():
|
||||
self[key] = value
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key not in self:
|
||||
super().__setitem__(key, [])
|
||||
super().__getitem__(key).append(value)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return super().__getitem__(key)[0]
|
||||
|
||||
def get(self, key, default=None, type=None):
|
||||
if key not in self:
|
||||
return default
|
||||
value = self[key]
|
||||
if type is not None:
|
||||
value = type(value)
|
||||
return value
|
||||
|
||||
def getlist(self, key, type=None):
|
||||
if key not in self:
|
||||
return []
|
||||
values = super().__getitem__(key)
|
||||
if type is not None:
|
||||
values = [type(value) for value in values]
|
||||
return values
|
||||
|
||||
|
||||
class Request():
|
||||
"""An HTTP request class."""
|
||||
|
||||
@@ -152,10 +184,10 @@ class Request():
|
||||
body)
|
||||
|
||||
def _parse_urlencoded(self, urlencoded):
|
||||
return {
|
||||
urldecode(key): urldecode(value) for key, value in [
|
||||
pair.split('=', 1) for pair in
|
||||
urlencoded.split('&')]}
|
||||
data = MultiDict()
|
||||
for k, v in [pair.split('=', 1) for pair in urlencoded.split('&')]:
|
||||
data[urldecode(k)] = urldecode(v)
|
||||
return data
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
@@ -277,7 +309,7 @@ class Response():
|
||||
stream.write(buf)
|
||||
if len(buf) < self.send_file_buffer_size:
|
||||
break
|
||||
if hasattr(self.body, 'close'): # pragma: no close
|
||||
if hasattr(self.body, 'close'): # pragma: no cover
|
||||
self.body.close()
|
||||
else:
|
||||
stream.write(self.body)
|
||||
@@ -616,7 +648,7 @@ class Microdot():
|
||||
while not self.shutdown_requested:
|
||||
try:
|
||||
sock, addr = self.server.accept()
|
||||
except OSError as exc:
|
||||
except OSError as exc: # pragma: no cover
|
||||
if exc.args[0] == errno.ECONNABORTED:
|
||||
break
|
||||
else:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from tests.microdot.test_multidict import TestMultiDict
|
||||
from tests.microdot.test_request import TestRequest
|
||||
from tests.microdot.test_response import TestResponse
|
||||
from tests.microdot.test_url_pattern import TestURLPattern
|
||||
|
||||
@@ -65,6 +65,39 @@ class TestMicrodot(unittest.TestCase):
|
||||
self.assertIn(b'Content-Type: text/plain\r\n', fd.response)
|
||||
self.assertTrue(fd.response.endswith(b'\r\n\r\nbar'))
|
||||
|
||||
def test_method_decorators(self):
|
||||
app = Microdot()
|
||||
|
||||
@app.get('/get')
|
||||
def get(req):
|
||||
return 'GET'
|
||||
|
||||
@app.post('/post')
|
||||
def post(req):
|
||||
return 'POST'
|
||||
|
||||
@app.put('/put')
|
||||
def put(req):
|
||||
return 'PUT'
|
||||
|
||||
@app.patch('/patch')
|
||||
def patch(req):
|
||||
return 'PATCH'
|
||||
|
||||
@app.delete('/delete')
|
||||
def delete(req):
|
||||
return 'DELETE'
|
||||
|
||||
methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
|
||||
mock_socket.clear_requests()
|
||||
fds = [mock_socket.add_request(method, '/' + method.lower())
|
||||
for method in methods]
|
||||
self._add_shutdown(app)
|
||||
app.run()
|
||||
for fd, method in zip(fds, methods):
|
||||
self.assertTrue(fd.response.endswith(
|
||||
b'\r\n\r\n' + method.encode()))
|
||||
|
||||
def test_before_after_request(self):
|
||||
app = Microdot()
|
||||
|
||||
|
||||
31
tests/microdot/test_multidict.py
Normal file
31
tests/microdot/test_multidict.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import unittest
|
||||
from microdot import MultiDict
|
||||
|
||||
|
||||
class TestMultiDict(unittest.TestCase):
|
||||
def test_multidict(self):
|
||||
d = MultiDict()
|
||||
|
||||
assert dict(d) == {}
|
||||
assert d.get('zero') is None
|
||||
assert d.get('zero', default=0) == 0
|
||||
assert d.getlist('zero') == []
|
||||
assert d.getlist('zero', type=int) == []
|
||||
|
||||
d['one'] = 1
|
||||
assert d['one'] == 1
|
||||
assert d.get('one') == 1
|
||||
assert d.get('one', default=2) == 1
|
||||
assert d.get('one', type=int) == 1
|
||||
assert d.get('one', type=str) == '1'
|
||||
|
||||
d['two'] = 1
|
||||
d['two'] = 2
|
||||
assert d['two'] == 1
|
||||
assert d.get('two') == 1
|
||||
assert d.get('two', default=2) == 1
|
||||
assert d.get('two', type=int) == 1
|
||||
assert d.get('two', type=str) == '1'
|
||||
assert d.getlist('two') == [1, 2]
|
||||
assert d.getlist('two', type=int) == [1, 2]
|
||||
assert d.getlist('two', type=str) == ['1', '2']
|
||||
@@ -1,5 +1,5 @@
|
||||
import unittest
|
||||
from microdot import Request
|
||||
from microdot import Request, MultiDict
|
||||
from tests.mock_socket import get_request_fd
|
||||
|
||||
|
||||
@@ -42,7 +42,8 @@ class TestRequest(unittest.TestCase):
|
||||
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
req = Request.create('app', fd, 'addr')
|
||||
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
|
||||
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertEqual(req.args, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
|
||||
def test_json(self):
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
@@ -68,7 +69,8 @@ class TestRequest(unittest.TestCase):
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = Request.create('app', fd, 'addr')
|
||||
form = req.form
|
||||
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertEqual(form, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
self.assertTrue(req.form is form)
|
||||
|
||||
fd = get_request_fd('GET', '/foo', headers={
|
||||
|
||||
@@ -4,6 +4,7 @@ except ImportError:
|
||||
import asyncio
|
||||
|
||||
import unittest
|
||||
from microdot import MultiDict
|
||||
from microdot_asyncio import Request
|
||||
from tests.mock_socket import get_async_request_fd
|
||||
|
||||
@@ -51,7 +52,8 @@ class TestRequestAsync(unittest.TestCase):
|
||||
fd = get_async_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
|
||||
req = _run(Request.create('app', fd, 'addr'))
|
||||
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
|
||||
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertEqual(req.args, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
|
||||
def test_json(self):
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
@@ -77,7 +79,8 @@ class TestRequestAsync(unittest.TestCase):
|
||||
body='foo=bar&abc=def&x=%2f%%')
|
||||
req = _run(Request.create('app', fd, 'addr'))
|
||||
form = req.form
|
||||
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
|
||||
self.assertEqual(form, MultiDict(
|
||||
{'foo': 'bar', 'abc': 'def', 'x': '/%%'}))
|
||||
self.assertTrue(req.form is form)
|
||||
|
||||
fd = get_async_request_fd('GET', '/foo', headers={
|
||||
|
||||
Reference in New Issue
Block a user