-
Notifications
You must be signed in to change notification settings - Fork 8
/
helpers.py
64 lines (49 loc) · 1.89 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import bz2
import gzip
import os
import json
from io import BytesIO
# File opening. This is based on the example on SO here:
# http://stackoverflow.com/a/26986344
fmagic = {b'\x1f\x8b\x08': gzip.open,
b'\x42\x5a\x68': bz2.BZ2File}
def auto_open(fn, *args, **kwargs):
"""function to open regular or compressed files for read / write.
This function opens files based on their "magic bytes". Supports bz2
and gzip. If it finds neither of these, presumption is it is a
standard, uncompressed file.
Example::
with auto_open("/path/to/file/maybe/compressed", mode="rb") as fh:
fh.read()
with auto_open("/tmp/test.txt.gz", mode="wb") as fh:
fh.write("my big testfile")
:param fn: either a string of an existing or new file path, or
a BytesIO handle
:param **kwargs: additional arguments that are understood by the
underlying open handler
:returns: a file handler
"""
if isinstance(fn, BytesIO):
return fn
if os.path.isfile(fn) and os.stat(fn).st_size > 0:
with open(fn, 'rb') as fp:
fs = fp.read(max([len(x) for x in fmagic]))
for (magic, _open) in fmagic.items():
if fs.startswith(magic):
return _open(fn, *args, **kwargs)
else:
if fn.endswith('gz'):
return gzip.open(fn, *args, **kwargs)
elif fn.endswith('bz2'):
return bz2.BZ2File(fn, *args, **kwargs)
return open(fn, *args, **kwargs)
def unique(seq):
"""Return the elements of a list uniquely while preserving the order
:param list seq: a list of hashable elements
:returns: new list with first occurence of elements of seq"""
seen = set()
return [x for x in seq if x not in seen and not seen.add(x)]
def load_json_file(path):
with auto_open(path, 'rb') as fh:
data = json.loads(fh.read().decode('utf-8'))
return data