-
Notifications
You must be signed in to change notification settings - Fork 363
/
utils.py
437 lines (348 loc) · 12.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
from hashlib import sha256
import math
import os
import pathlib
import re
import sys
from urllib.parse import urlsplit
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20
def infer_storage_options(urlpath, inherit_storage_options=None):
"""Infer storage options from URL path and merge it with existing storage
options.
Parameters
----------
urlpath: str or unicode
Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
inherit_storage_options: dict (optional)
Its contents will get merged with the inferred information from the
given path
Returns
-------
Storage options dict.
Examples
--------
>>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
{"protocol": "file", "path", "/mnt/datasets/test.csv"}
>>> infer_storage_options(
... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
... inherit_storage_options={'extra': 'value'}) # doctest: +SKIP
{"protocol": "hdfs", "username": "username", "password": "pwd",
"host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
"url_query": "q=1", "extra": "value"}
"""
# Handle Windows paths including disk name in this special case
if (
re.match(r"^[a-zA-Z]:[\\/]", urlpath)
or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
):
return {"protocol": "file", "path": urlpath}
parsed_path = urlsplit(urlpath)
protocol = parsed_path.scheme or "file"
if parsed_path.fragment:
path = "#".join([parsed_path.path, parsed_path.fragment])
else:
path = parsed_path.path
if protocol == "file":
# Special case parsing file protocol URL on Windows according to:
# https://msdn.microsoft.com/en-us/library/jj710207.aspx
windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
if windows_path:
path = "%s:%s" % windows_path.groups()
if protocol in ["http", "https"]:
# for HTTP, we don't want to parse, as requests will anyway
return {"protocol": protocol, "path": urlpath}
options = {"protocol": protocol, "path": path}
if parsed_path.netloc:
# Parse `hostname` from netloc manually because `parsed_path.hostname`
# lowercases the hostname which is not always desirable (e.g. in S3):
# https://github.com/dask/dask/issues/1417
options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
if protocol in ("s3", "gcs", "gs"):
options["path"] = options["host"] + options["path"]
else:
options["host"] = options["host"]
if parsed_path.port:
options["port"] = parsed_path.port
if parsed_path.username:
options["username"] = parsed_path.username
if parsed_path.password:
options["password"] = parsed_path.password
if parsed_path.query:
options["url_query"] = parsed_path.query
if parsed_path.fragment:
options["url_fragment"] = parsed_path.fragment
if inherit_storage_options:
update_storage_options(options, inherit_storage_options)
return options
def update_storage_options(options, inherited=None):
if not inherited:
inherited = {}
collisions = set(options) & set(inherited)
if collisions:
collisions = "\n".join("- %r" % k for k in collisions)
raise KeyError(
"Collision between inferred and specified storage "
"options:\n%s" % collisions
)
options.update(inherited)
# Compression extensions registered via fsspec.compression.register_compression
compressions = {}
def infer_compression(filename):
"""Infer compression, if available, from filename.
Infer a named compression type, if registered and available, from filename
extension. This includes builtin (gz, bz2, zip) compressions, as well as
optional compressions. See fsspec.compression.register_compression.
"""
extension = os.path.splitext(filename)[-1].strip(".")
if extension in compressions:
return compressions[extension]
def build_name_function(max_int):
"""Returns a function that receives a single integer
and returns it as a string padded by enough zero characters
to align with maximum possible integer
>>> name_f = build_name_function(57)
>>> name_f(7)
'07'
>>> name_f(31)
'31'
>>> build_name_function(1000)(42)
'0042'
>>> build_name_function(999)(42)
'042'
>>> build_name_function(0)(0)
'0'
"""
# handle corner cases max_int is 0 or exact power of 10
max_int += 1e-8
pad_length = int(math.ceil(math.log10(max_int)))
def name_function(i):
return str(i).zfill(pad_length)
return name_function
def seek_delimiter(file, delimiter, blocksize):
r"""Seek current file to file start, file end, or byte after delimiter seq.
Seeks file to next chunk delimiter, where chunks are defined on file start,
a delimiting sequence, and file end. Use file.tell() to see location afterwards.
Note that file start is a valid split, so must be at offset > 0 to seek for
delimiter.
Parameters
----------
file: a file
delimiter: bytes
a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
blocksize: int
Number of bytes to read from the file at once.
Returns
-------
Returns True if a delimiter was found, False if at file start or end.
"""
if file.tell() == 0:
# beginning-of-file, return without seek
return False
# Interface is for binary IO, with delimiter as bytes, but initialize last
# with result of file.read to preserve compatibility with text IO.
last = None
while True:
current = file.read(blocksize)
if not current:
# end-of-file without delimiter
return False
full = last + current if last else current
try:
if delimiter in full:
i = full.index(delimiter)
file.seek(file.tell() - (len(full) - i) + len(delimiter))
return True
elif len(current) < blocksize:
# end-of-file without delimiter
return False
except (OSError, ValueError):
pass
last = full[-len(delimiter) :]
def read_block(f, offset, length, delimiter=None, split_before=False):
"""Read a block of bytes from a file
Parameters
----------
f: File
Open file
offset: int
Byte offset to start read
length: int
Number of bytes to read, read through end of file if None
delimiter: bytes (optional)
Ensure reading starts and stops at delimiter bytestring
split_before: bool (optional)
Start/stop read *before* delimiter bytestring.
If using the ``delimiter=`` keyword argument we ensure that the read
starts and stops at delimiter boundaries that follow the locations
``offset`` and ``offset + length``. If ``offset`` is zero then we
start at zero, regardless of delimiter. The bytestring returned WILL
include the terminating delimiter string.
Examples
--------
>>> from io import BytesIO # doctest: +SKIP
>>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
>>> read_block(f, 0, 13) # doctest: +SKIP
b'Alice, 100\\nBo'
>>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
b'Alice, 100\\nBob, 200\\n'
>>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
b'Bob, 200\\nCharlie, 300'
"""
if delimiter:
f.seek(offset)
found_start_delim = seek_delimiter(f, delimiter, 2 ** 16)
if length is None:
return f.read()
start = f.tell()
length -= start - offset
f.seek(start + length)
found_end_delim = seek_delimiter(f, delimiter, 2 ** 16)
end = f.tell()
# Adjust split location to before delimiter iff seek found the
# delimiter sequence, not start or end of file.
if found_start_delim and split_before:
start -= len(delimiter)
if found_end_delim and split_before:
end -= len(delimiter)
offset = start
length = end - start
f.seek(offset)
b = f.read(length)
return b
def tokenize(*args, **kwargs):
"""Deterministic token
(modified from dask.base)
>>> tokenize([1, 2, '3'])
'9d71491b50023b06fc76928e6eddb952'
>>> tokenize('Hello') == tokenize('Hello')
True
"""
if kwargs:
args += (kwargs,)
return sha256(str(args).encode()).hexdigest()
def stringify_path(filepath):
"""Attempt to convert a path-like object to a string.
Parameters
----------
filepath: object to be converted
Returns
-------
filepath_str: maybe a string version of the object
Notes
-----
Objects supporting the fspath protocol (Python 3.6+) are coerced
according to its __fspath__ method.
For backwards compatibility with older Python version, pathlib.Path
objects are specially coerced.
Any other object is passed through unchanged, which includes bytes,
strings, buffers, or anything else that's not even path-like.
"""
if hasattr(filepath, "__fspath__"):
return filepath.__fspath__()
elif isinstance(filepath, pathlib.Path):
return str(filepath)
return filepath
def make_instance(cls, args, kwargs):
inst = cls(*args, **kwargs)
inst._determine_worker()
return inst
def common_prefix(paths):
"""For a list of paths, find the shortest prefix common to all"""
parts = [p.split("/") for p in paths]
lmax = min(len(p) for p in parts)
end = 0
for i in range(lmax):
end = all(p[i] == parts[0][i] for p in parts)
if not end:
break
i += end
return "/".join(parts[0][:i])
def other_paths(paths, path2, is_dir=None):
"""In bulk file operations, construct a new file tree from a list of files
Parameters
----------
paths: list of str
The input file tree
path2: str or list of str
Root to construct the new list in. If this is already a list of str, we just
assert it has the right number of elements.
is_dir: bool (optional)
For the special case where the input in one element, whether to regard the value
as the target path, or as a directory to put a file path within. If None, a
directory is inferred if the path ends in '/'
Returns
-------
list of str
"""
if isinstance(path2, str):
is_dir = is_dir or path2.endswith("/")
path2 = path2.rstrip("/")
if len(paths) > 1:
cp = common_prefix(paths)
path2 = [p.replace(cp, path2, 1) for p in paths]
else:
if is_dir:
path2 = [path2.rstrip("/") + "/" + paths[0].rsplit("/")[-1]]
else:
path2 = [path2]
else:
assert len(paths) == len(path2)
return path2
def is_exception(obj):
return isinstance(obj, BaseException)
def get_protocol(url):
parts = re.split(r"(\:\:|\://)", url, 1)
if len(parts) > 1:
return parts[0]
return "file"
def can_be_local(path):
"""Can the given URL be used wih open_local?"""
from fsspec import get_filesystem_class
try:
return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
except (ValueError, ImportError):
# not in registry or import failed
return False
def setup_logger(logname, level="DEBUG", clear=True):
"""Add standard logging handler to logger of given name"""
import logging
logger = logging.getLogger(logname)
if clear:
logger.handlers.clear()
handle = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s " "- %(message)s"
)
handle.setFormatter(formatter)
logger.addHandler(handle)
logger.setLevel(level)
return logger
def get_package_version_without_import(name):
"""For given package name, try to find the version without importing it
Import and package.__version__ is still the backup here, so an import
*might* happen.
Returns either the version string, or None if the package
or the version was not readily found.
"""
if name in sys.modules:
mod = sys.modules[name]
if hasattr(mod, "__version__"):
return mod.__version__
if sys.version_info >= (3, 8):
try:
import importlib.metadata
return importlib.metadata.distribution(name).version
except ImportError:
pass
else:
try:
import importlib_metadata
return importlib_metadata.distribution(name).version
except ImportError:
pass
try:
import importlib
mod = importlib.import_module(name)
return mod.__version__
except (ImportError, AttributeError):
return None