This repository has been archived by the owner on Mar 28, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
utils.py
372 lines (291 loc) · 10.7 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import ast
import hashlib
import hmac
import os
import re
import six
import time
import warnings
from base64 import b64decode, b64encode
from binascii import hexlify
from six.moves.urllib import parse as urlparse
from enum import Enum
# ujson is not installable with pypy
try: # pragma: no cover
import ujson as json # NOQA
def json_serializer(v, **kw):
return json.dumps(v, escape_forward_slashes=False)
except ImportError: # pragma: no cover
import json # NOQA
json_serializer = json.dumps
try:
# Register psycopg2cffi as psycopg2
from psycopg2cffi import compat
except ImportError: # pragma: no cover
pass
else: # pragma: no cover
compat.register()
try:
import sqlalchemy
except ImportError: # pragma: no cover
sqlalchemy = None
from pyramid import httpexceptions
from pyramid.request import Request, apply_request_extensions
from pyramid.settings import aslist
from pyramid.view import render_view_to_response
from cornice import cors
from colander import null
def strip_whitespace(v):
"""Remove whitespace, newlines, and tabs from the beginning/end
of a string.
:param str v: the string to strip.
:rtype: str
"""
return v.strip(' \t\n\r') if v is not null else v
def msec_time():
"""Return current epoch time in milliseconds.
:rtype: int
"""
return int(time.time() * 1000.0) # floor
def classname(obj):
"""Get a classname from an object.
:rtype: str
"""
return obj.__class__.__name__.lower()
def merge_dicts(a, b):
"""Merge b into a recursively, without overwriting values.
:param dict a: the dict that will be altered with values of `b`.
:rtype: None
"""
for k, v in b.items():
if isinstance(v, dict):
merge_dicts(a.setdefault(k, {}), v)
else:
a.setdefault(k, v)
def random_bytes_hex(bytes_length):
"""Return a hexstring of bytes_length cryptographic-friendly random bytes.
:param integer bytes_length: number of random bytes.
:rtype: str
"""
return hexlify(os.urandom(bytes_length)).decode('utf-8')
def native_value(value):
"""Convert string value to native python values.
:param str value: value to interprete.
:returns: the value coerced to python type
"""
if isinstance(value, six.string_types):
if value.lower() in ['on', 'true', 'yes']:
value = True
elif value.lower() in ['off', 'false', 'no']:
value = False
try:
return ast.literal_eval(value)
except (ValueError, SyntaxError):
pass
return value
def read_env(key, value):
"""Read the setting key from environment variables.
:param key: the setting name
:param value: default value if undefined in environment
:returns: the value from environment, coerced to python type
"""
envkey = key.replace('.', '_').replace('-', '_').upper()
return native_value(os.getenv(envkey, value))
def encode64(content, encoding='utf-8'):
"""Encode some content in base64.
:rtype: str
"""
return b64encode(content.encode(encoding)).decode(encoding)
def decode64(encoded_content, encoding='utf-8'):
"""Decode some base64 encoded content.
:rtype: str
"""
return b64decode(encoded_content.encode(encoding)).decode(encoding)
def hmac_digest(secret, message, encoding='utf-8'):
"""Return hex digest of a message HMAC using secret"""
if isinstance(secret, six.text_type):
secret = secret.encode(encoding)
return hmac.new(secret,
message.encode(encoding),
hashlib.sha256).hexdigest()
def dict_subset(d, keys):
"""Return a dict with the specified keys"""
result = {}
for key in keys:
if '.' in key:
field, subfield = key.split('.', 1)
if isinstance(d.get(field), dict):
subvalue = dict_subset(d[field], [subfield])
result.setdefault(field, {}).update(subvalue)
elif field in d:
result[field] = d[field]
else:
if key in d:
result[key] = d[key]
return result
class COMPARISON(Enum):
LT = '<'
MIN = '>='
MAX = '<='
NOT = '!='
EQ = '=='
GT = '>'
IN = 'in'
EXCLUDE = 'exclude'
def reapply_cors(request, response):
"""Reapply cors headers to the new response with regards to the request.
We need to re-apply the CORS checks done by Cornice, in case we're
recreating the response from scratch.
"""
service = request.current_service
if service:
request.info['cors_checked'] = False
cors.apply_cors_post_request(service, request, response)
response = cors.ensure_origin(service, request, response)
else:
# No existing service is concerned, and Cornice is not implied.
origin = request.headers.get('Origin')
if origin:
settings = request.registry.settings
allowed_origins = set(aslist(settings['cors_origins']))
required_origins = {'*', decode_header(origin)}
if allowed_origins.intersection(required_origins):
origin = encode_header(origin)
response.headers['Access-Control-Allow-Origin'] = origin
# Import service here because cliquet import utils
from cliquet import Service
if Service.default_cors_headers:
headers = ','.join(Service.default_cors_headers)
response.headers['Access-Control-Expose-Headers'] = headers
return response
def current_service(request):
"""Return the Cornice service matching the specified request.
:returns: the service or None if unmatched.
:rtype: cornice.Service
"""
if request.matched_route:
services = request.registry.cornice_services
pattern = request.matched_route.pattern
try:
service = services[pattern]
except KeyError:
return None
else:
return service
def current_resource_name(request):
"""Return the name used when the Cliquet resource was registered along its
viewset.
:returns: the resource identifier.
:rtype: str
"""
service = current_service(request)
resource_name = service.viewset.get_name(service.resource)
return resource_name
def build_request(original, dict_obj):
"""
Transform a dict object into a ``pyramid.request.Request`` object.
It sets a ``parent`` attribute on the resulting request assigned with
the `original` request specified.
:param original: the original request.
:param dict_obj: a dict object with the sub-request specifications.
"""
api_prefix = '/%s' % original.upath_info.split('/')[1]
path = dict_obj['path']
if not path.startswith(api_prefix):
path = api_prefix + path
path = path.encode('utf-8')
method = dict_obj.get('method') or 'GET'
headers = dict(original.headers)
headers.update(**dict_obj.get('headers') or {})
payload = dict_obj.get('body') or ''
# Payload is always a dict (from ``BatchRequestSchema.body``).
# Send it as JSON for subrequests.
if isinstance(payload, dict):
headers['Content-Type'] = encode_header(
'application/json; charset=utf-8')
payload = json.dumps(payload)
if six.PY3: # pragma: no cover
path = path.decode('latin-1')
request = Request.blank(path=path,
headers=headers,
POST=payload,
method=method)
request.registry = original.registry
apply_request_extensions(request)
# This is used to distinguish subrequests from direct incoming requests.
# See :func:`cliquet.initialization.setup_logging()`
request.parent = original
return request
def build_response(response, request):
"""
Transform a ``pyramid.response.Response`` object into a serializable dict.
:param response: a response object, returned by Pyramid.
:param request: the request that was used to get the response.
"""
dict_obj = {}
dict_obj['path'] = urlparse.unquote(request.path)
dict_obj['status'] = response.status_code
dict_obj['headers'] = dict(response.headers)
body = ''
if request.method != 'HEAD':
# XXX : Pyramid should not have built response body for HEAD!
try:
body = response.json
except ValueError:
body = response.body
dict_obj['body'] = body
return dict_obj
def follow_subrequest(request, subrequest, **kwargs):
"""Run a subrequest (e.g. batch), and follow the redirection if any.
:rtype: tuple
:returns: the reponse and the redirection request (or `subrequest`
if no redirection happened.)
"""
try:
try:
return request.invoke_subrequest(subrequest, **kwargs), subrequest
except Exception as e:
resp = render_view_to_response(e, subrequest)
if not resp or resp.status_code >= 500:
raise e
raise resp
except httpexceptions.HTTPRedirection as e:
new_location = e.headers['Location']
new_request = Request.blank(path=new_location,
headers=subrequest.headers,
POST=subrequest.body,
method=subrequest.method)
new_request.bound_data = subrequest.bound_data
new_request.parent = getattr(subrequest, 'parent', None)
return request.invoke_subrequest(new_request, **kwargs), new_request
def encode_header(value, encoding='utf-8'):
"""Make sure the value is of type ``str`` in both PY2 and PY3."""
value_type = type(value)
if value_type != str:
# Test for Python3
if value_type == six.binary_type: # pragma: no cover
value = value.decode(encoding)
# Test for Python2
elif value_type == six.text_type: # pragma: no cover
value = value.encode(encoding)
return value
def decode_header(value, encoding='utf-8'):
"""Make sure the header is an unicode string."""
if type(value) == six.binary_type:
value = value.decode(encoding)
return value
def strip_uri_prefix(path):
"""
Remove potential version prefix in URI.
"""
return re.sub(r'^(/v\d+)?', '', six.text_type(path))
class DeprecatedMeta(type):
"""A metaclass to be set on deprecated classes.
Warning will happen when class is inherited.
"""
def __new__(meta, name, bases, attrs):
for b in bases:
if isinstance(b, DeprecatedMeta):
error_msg = b.__deprecation_warning__
warnings.warn(error_msg, DeprecationWarning, stacklevel=2)
return super(DeprecatedMeta, meta).__new__(meta, name, bases, attrs)