-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
plugins.py
177 lines (150 loc) · 6.44 KB
/
plugins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import functools
import inspect
import itertools
import sys
import warnings
from .common import BACKEND_ENTRYPOINTS, BackendEntrypoint
if sys.version_info >= (3, 8):
from importlib.metadata import entry_points
else:
# if the fallback library is missing, we are doomed.
from importlib_metadata import entry_points
STANDARD_BACKENDS_ORDER = ["netcdf4", "h5netcdf", "scipy"]
def remove_duplicates(entrypoints):
# sort and group entrypoints by name
entrypoints = sorted(entrypoints, key=lambda ep: ep.name)
entrypoints_grouped = itertools.groupby(entrypoints, key=lambda ep: ep.name)
# check if there are multiple entrypoints for the same name
unique_entrypoints = []
for name, matches in entrypoints_grouped:
# remove equal entrypoints
matches = list(set(matches))
unique_entrypoints.append(matches[0])
matches_len = len(matches)
if matches_len > 1:
all_module_names = [e.value.split(":")[0] for e in matches]
selected_module_name = all_module_names[0]
warnings.warn(
f"Found {matches_len} entrypoints for the engine name {name}:"
f"\n {all_module_names}.\n "
f"The entrypoint {selected_module_name} will be used.",
RuntimeWarning,
)
return unique_entrypoints
def detect_parameters(open_dataset):
signature = inspect.signature(open_dataset)
parameters = signature.parameters
parameters_list = []
for name, param in parameters.items():
if param.kind in (
inspect.Parameter.VAR_KEYWORD,
inspect.Parameter.VAR_POSITIONAL,
):
raise TypeError(
f"All the parameters in {open_dataset!r} signature should be explicit. "
"*args and **kwargs is not supported"
)
if name != "self":
parameters_list.append(name)
return tuple(parameters_list)
def backends_dict_from_pkg(entrypoints):
backend_entrypoints = {}
for entrypoint in entrypoints:
name = entrypoint.name
try:
backend = entrypoint.load()
backend_entrypoints[name] = backend
except Exception as ex:
warnings.warn(f"Engine {name!r} loading failed:\n{ex}", RuntimeWarning)
return backend_entrypoints
def set_missing_parameters(backend_entrypoints):
for name, backend in backend_entrypoints.items():
if backend.open_dataset_parameters is None:
open_dataset = backend.open_dataset
backend.open_dataset_parameters = detect_parameters(open_dataset)
def sort_backends(backend_entrypoints):
ordered_backends_entrypoints = {}
for be_name in STANDARD_BACKENDS_ORDER:
if backend_entrypoints.get(be_name, None) is not None:
ordered_backends_entrypoints[be_name] = backend_entrypoints.pop(be_name)
ordered_backends_entrypoints.update(
{name: backend_entrypoints[name] for name in sorted(backend_entrypoints)}
)
return ordered_backends_entrypoints
def build_engines(entrypoints):
backend_entrypoints = {}
for backend_name, backend in BACKEND_ENTRYPOINTS.items():
if backend.available:
backend_entrypoints[backend_name] = backend
entrypoints = remove_duplicates(entrypoints)
external_backend_entrypoints = backends_dict_from_pkg(entrypoints)
backend_entrypoints.update(external_backend_entrypoints)
backend_entrypoints = sort_backends(backend_entrypoints)
set_missing_parameters(backend_entrypoints)
return {name: backend() for name, backend in backend_entrypoints.items()}
@functools.lru_cache(maxsize=1)
def list_engines():
entrypoints = entry_points().get("xarray.backends", ())
return build_engines(entrypoints)
def guess_engine(store_spec):
engines = list_engines()
for engine, backend in engines.items():
try:
if backend.guess_can_open(store_spec):
return engine
except Exception:
warnings.warn(f"{engine!r} fails while guessing", RuntimeWarning)
compatible_engines = []
for engine, backend_cls in BACKEND_ENTRYPOINTS.items():
try:
backend = backend_cls()
if backend.guess_can_open(store_spec):
compatible_engines.append(engine)
except Exception:
warnings.warn(f"{engine!r} fails while guessing", RuntimeWarning)
installed_engines = [k for k in engines if k != "store"]
if not compatible_engines:
if installed_engines:
error_msg = (
"did not find a match in any of xarray's currently installed IO "
f"backends {installed_engines}. Consider explicitly selecting one of the "
"installed engines via the ``engine`` parameter, or installing "
"additional IO dependencies, see:\n"
"http://xarray.pydata.org/en/stable/getting-started-guide/installing.html\n"
"http://xarray.pydata.org/en/stable/user-guide/io.html"
)
else:
error_msg = (
"xarray is unable to open this file because it has no currently "
"installed IO backends. Xarray's read/write support requires "
"installing optional IO dependencies, see:\n"
"http://xarray.pydata.org/en/stable/getting-started-guide/installing.html\n"
"http://xarray.pydata.org/en/stable/user-guide/io"
)
else:
error_msg = (
"found the following matches with the input file in xarray's IO "
f"backends: {compatible_engines}. But their dependencies may not be installed, see:\n"
"http://xarray.pydata.org/en/stable/user-guide/io.html \n"
"http://xarray.pydata.org/en/stable/getting-started-guide/installing.html"
)
raise ValueError(error_msg)
def get_backend(engine):
"""Select open_dataset method based on current engine."""
if isinstance(engine, str):
engines = list_engines()
if engine not in engines:
raise ValueError(
f"unrecognized engine {engine} must be one of: {list(engines)}"
)
backend = engines[engine]
elif isinstance(engine, type) and issubclass(engine, BackendEntrypoint):
backend = engine()
else:
raise TypeError(
(
"engine must be a string or a subclass of "
f"xarray.backends.BackendEntrypoint: {engine}"
)
)
return backend