-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy path__init__.py
167 lines (134 loc) · 5.13 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import json
import functools
import asyncio
from collections import defaultdict
from aiohttp import web
from aiohttp.abc import AbstractView
from jsonschema.validators import validator_for
__author__ = """Dmitry Chaplinsky"""
__email__ = '[email protected]'
__version__ = '0.1.1'
def _raise_exception(cls, reason, data=None):
"""
Raise aiohttp exception and pass payload/reason into it.
"""
text_dict = {
"error": reason
}
if data is not None:
text_dict["errors"] = data
raise cls(
text=json.dumps(text_dict),
content_type="application/json"
)
def _validate_data(data, schema, validator_cls):
"""
Validate the dict against given schema (using given validator class).
"""
validator = validator_cls(schema)
_errors = defaultdict(list)
def set_nested_item(dataDict, mapList, key, val):
for _key in mapList:
dataDict.setdefault(_key, {})
dataDict = dataDict[_key]
dataDict.setdefault(key, list())
dataDict[key].append(val)
for err in validator.iter_errors(data):
path = err.schema_path
# Code courtesy: Ruslan Karalkin
# Looking in error schema path for
# property that failed validation
# Schema example:
# {
# "type": "object",
# "properties": {
# "foo": {"type": "number"},
# "bar": {"type": "string"}
# }
# "required": ["foo", "bar"]
# }
#
# Related err.schema_path examples:
# ['required'],
# ['properties', 'foo', 'type']
if "properties" in path:
path.remove("properties")
key = path.popleft()
# If validation failed by missing property,
# then parse err.message to find property name
# as it always first word enclosed in quotes
if "required" in path or key == "required":
key = err.message.split("'")[1]
elif err.relative_path:
key = err.relative_path.pop()
set_nested_item(_errors, err.relative_path, key, err.message)
if _errors:
_raise_exception(
web.HTTPBadRequest,
"Request is invalid; There are validation errors.",
_errors)
def validate(request_schema=None, response_schema=None):
"""
Decorate request handler to make it automagically validate it's request
and response.
"""
def wrapper(func):
# Validating the schemas itself.
# Die with exception if they aren't valid
if request_schema is not None:
_request_schema_validator = validator_for(request_schema)
_request_schema_validator.check_schema(request_schema)
if response_schema is not None:
_response_schema_validator = validator_for(response_schema)
_response_schema_validator.check_schema(response_schema)
@asyncio.coroutine
@functools.wraps(func)
def wrapped(*args):
if asyncio.iscoroutinefunction(func):
coro = func
else:
coro = asyncio.coroutine(func)
# Supports class based views see web.View
if isinstance(args[0], AbstractView):
class_based = True
request = args[0].request
else:
if func.__name__ != func.__qualname__:
class_based = True
else:
class_based = False
request = args[-1]
# Strictly expect json object here
try:
req_body = yield from request.json()
except (json.decoder.JSONDecodeError, TypeError):
_raise_exception(
web.HTTPBadRequest,
"Request is malformed; could not decode JSON object.")
# Validate request data against request schema (if given)
if request_schema is not None:
_validate_data(req_body, request_schema,
_request_schema_validator)
coro_args = req_body, request
if class_based:
coro_args = (args[0],) + coro_args
context = yield from coro(*coro_args)
# No validation of response for websockets stream
if isinstance(context, web.StreamResponse):
return context
# Validate response data against response schema (if given)
if response_schema is not None:
_validate_data(context, response_schema,
_response_schema_validator)
try:
return web.json_response(context)
except (TypeError,):
_raise_exception(
web.HTTPInternalServerError,
"Response is malformed; could not encode JSON object.")
# Store schemas in wrapped handlers, so it later can be reused
setattr(wrapped, "_request_schema", request_schema)
setattr(wrapped, "_response_schema", response_schema)
return wrapped
return wrapper
__ALL__ = ["validate", "__author__", "__email__", "__version__"],