improve code structure

This commit is contained in:
Miguel Grinberg
2019-05-04 20:17:04 +00:00
parent 8aa50f171d
commit b16466f1a9
3 changed files with 125 additions and 115 deletions

View File

@@ -42,53 +42,59 @@ class Request():
class G:
pass
def __init__(self, client_sock, client_addr):
self.client_sock = client_sock
def __init__(self, client_addr, method, url, http_version, headers, body):
self.client_addr = client_addr
self.url_args = None
self.g = Request.G()
if not hasattr(client_sock, 'readline'): # pragma: no cover
self.client_stream = client_sock.makefile("rwb")
else:
self.client_stream = client_sock
# request line
line = self.client_stream.readline().strip().decode()
self.method, self.path, self.http_version = line.split()
self.http_version = self.http_version.split('/', 1)[1]
self.method = method
self.path = url
self.http_version = http_version
if '?' in self.path:
self.path, self.query_string = self.path.split('?', 1)
self.args = self._parse_urlencoded(self.query_string)
else:
self.query_string = None
self.args = {}
# headers
self.headers = {}
self.headers = headers
self.cookies = {}
self.content_length = 0
self.content_type = None
while True:
line = self.client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
self.headers[header] = value
for header, value in self.headers.items():
if header == 'Content-Length':
self.content_length = int(value)
elif header == 'Content-Type':
self.content_type = value
elif header == 'Cookie':
for cookie in self.headers['Cookie'].split(';'):
for cookie in value.split(';'):
name, value = cookie.strip().split('=', 1)
self.cookies[name] = value
# body
self.body = self.client_stream.read(self.content_length)
self.body = body
self._json = None
self._form = None
self.g = Request.G()
@staticmethod
def create(client_stream, client_addr):
# request line
line = client_stream.readline().strip().decode()
method, url, http_version = line.split()
http_version = http_version.split('/', 1)[1]
# headers
headers = {}
content_length = 0
while True:
line = client_stream.readline().strip().decode()
if line == '':
break
header, value = line.split(':', 1)
value = value.strip()
headers[header] = value
if header == 'Content-Length':
content_length = int(value)
# body
body = client_stream.read(content_length) if content_length else b''
return Request(client_addr, method, url, http_version, headers, body)
def _parse_urlencoded(self, urlencoded):
return {
@@ -112,11 +118,6 @@ class Request():
self._form = self._parse_urlencoded(self.body.decode())
return self._form
def close(self):
self.client_stream.close()
if self.client_stream != self.client_sock: # pragma: no cover
self.client_sock.close()
class Response():
types_map = {
@@ -164,42 +165,38 @@ class Response():
else:
self.headers['Set-Cookie'] = [http_cookie]
def write(self, client_stream):
def complete(self):
if not 'Content-Length' in self.headers:
self.headers['Content-Length'] = str(len(self.body))
if not 'Content-Type' in self.headers:
self.headers['Content-Type'] = 'text/plain'
def write(self, stream):
self.complete()
# status code
client_stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
status_code=self.status_code,
reason='OK' if self.status_code == 200 else 'N/A').encode())
# headers
content_length_found = False
content_type_found = False
for header, value in self.headers.items():
values = value if isinstance(value, list) else [value]
for value in values:
client_stream.write('{header}: {value}\r\n'.format(
stream.write('{header}: {value}\r\n'.format(
header=header, value=value).encode())
if header == 'Content-Length':
content_length_found = True
elif header == 'Content-Type':
content_type_found = True
if not content_length_found:
client_stream.write('Content-Length: {length}\r\n'.format(
length=len(self.body)).encode())
if not content_type_found:
client_stream.write(b'Content-Type: text/plain\r\n')
client_stream.write(b'\r\n')
stream.write(b'\r\n')
# body
if self.body:
client_stream.write(self.body)
stream.write(self.body)
@staticmethod
def redirect(location, status_code=302):
return Response(status_code=status_code,
headers={'Location': location})
@classmethod
def redirect(cls, location, status_code=302):
return cls(status_code=status_code, headers={'Location': location})
@staticmethod
def send_file(filename, status_code=200, content_type=None):
@classmethod
def send_file(cls, filename, status_code=200, content_type=None):
if content_type is None:
ext = filename.split('.')[-1]
if ext in Response.types_map:
@@ -208,9 +205,9 @@ class Response():
content_type = 'application/octet-stream'
with open(filename) as f:
body = f.read()
return Response(body=body, status_code=status_code,
headers={'Content-Type': content_type,
'Content-Length': str(len(body))})
return cls(body=body, status_code=status_code,
headers={'Content-Type': content_type,
'Content-Length': str(len(body))})
class URLPattern():
@@ -305,56 +302,72 @@ class Microdot():
s.listen(5)
while True:
req = Request(*s.accept())
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
sock, addr = s.accept()
if not hasattr(sock, 'readline'): # pragma: no cover
stream = sock.makefile("rwb")
else:
stream = sock
req = Request.create(stream, addr)
res = self.dispatch_request(req)
if debug: # pragma: no cover
print('{method} {path} {status_code}'.format(
method=req.method, path=req.path,
status_code=res.status_code))
res.write(req.client_stream)
req.close()
res.write(stream)
stream.close()
if stream != sock: # pragma: no cover
sock.close()
def find_route(self, req):
f = None
for route_methods, route_pattern, route_handler in self.url_map:
if req.method in route_methods:
req.url_args = route_pattern.match(req.path)
if req.url_args is not None:
f = route_handler
break
return f
def dispatch_request(self, req):
f = self.find_route(req)
try:
res = None
if f:
for handler in self.before_request_handlers:
res = handler(req)
if res:
break
if res is None:
res = f(req, **req.url_args)
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
for handler in self.after_request_handlers:
res = handler(req, res) or res
elif 404 in self.error_handlers:
res = self.error_handlers[404](req)
else:
res = 'Not found', 404
except Exception as exc:
print_exception(exc)
res = None
if exc.__class__ in self.error_handlers:
try:
res = self.error_handlers[exc.__class__](req, exc)
except Exception as exc2: # pragma: no cover
print_exception(exc2)
if res is None:
if 500 in self.error_handlers:
res = self.error_handlers[500](req)
else:
res = 'Internal server error', 500
if isinstance(res, tuple):
res = Response(*res)
elif not isinstance(res, Response):
res = Response(res)
return res
redirect = Response.redirect

View File

@@ -63,7 +63,6 @@ class TestMicrodot(unittest.TestCase):
@app.after_request
def after_request_two(req, res):
print('two')
res.set_cookie('foo', 'bar')
return res

View File

@@ -6,9 +6,7 @@ from tests.mock_socket import get_request_fd
class TestRequest(unittest.TestCase):
def test_create_request(self):
fd = get_request_fd('GET', '/foo')
req = Request(fd, 'addr')
req.close()
self.assertEqual(req.client_sock, fd)
req = Request.create(fd, 'addr')
self.assertEqual(req.client_addr, 'addr')
self.assertEqual(req.method, 'GET')
self.assertEqual(req.path, '/foo')
@@ -28,7 +26,7 @@ class TestRequest(unittest.TestCase):
'Content-Type': 'application/json',
'Cookie': 'foo=bar;abc=def',
'Content-Length': '3'}, body='aaa')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
self.assertEqual(req.headers, {
'Host': 'example.com:1234',
'Content-Type': 'application/json',
@@ -41,33 +39,33 @@ class TestRequest(unittest.TestCase):
def test_args(self):
fd = get_request_fd('GET', '/?foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
self.assertEqual(req.query_string, 'foo=bar&abc=def&x=%2f%%')
self.assertEqual(req.args, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
def test_json(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='{"foo":"bar"}')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
json = req.json
self.assertEqual(json, {'foo': 'bar'})
self.assertTrue(req.json is json)
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'}, body='[1, "2"]')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
self.assertEqual(req.json, [1, '2'])
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/xml'}, body='[1, "2"]')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
self.assertIsNone(req.json)
def test_form(self):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/x-www-form-urlencoded'},
body='foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
form = req.form
self.assertEqual(form, {'foo': 'bar', 'abc': 'def', 'x': '/%%'})
self.assertTrue(req.form is form)
@@ -75,5 +73,5 @@ class TestRequest(unittest.TestCase):
fd = get_request_fd('GET', '/foo', headers={
'Content-Type': 'application/json'},
body='foo=bar&abc=def&x=%2f%%')
req = Request(fd, 'addr')
req = Request.create(fd, 'addr')
self.assertIsNone(req.form)