-
Notifications
You must be signed in to change notification settings - Fork 2
/
cock.py
142 lines (113 loc) · 4.8 KB
/
cock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from collections import ChainMap
from functools import reduce
from pathlib import Path
from typing import Any, Callable, Iterator, List, Tuple, Union
import click
import yaml
from sortedcontainers import SortedDict
__all__ = ("build_entrypoint", "build_options_from_dict", "get_options_defaults", "Config", "Option")
__version__ = "0.11.0"
version = tuple(map(int, __version__.split(".")))
class Config(SortedDict):
def __getattr__(self, name: str) -> Any:
return self[name]
class Option:
def __init__(self, name: Union[str, None] = None, **attributes):
if "required" in attributes:
raise ValueError("`required` attribute is not allowed")
self._name = None
if name is not None:
self.name = name
self.attributes = attributes
@property
def name(self) -> str:
if self._name is None:
raise RuntimeError("Want to get `name`, but it is not set")
return self._name
@name.setter
def name(self, value: str):
if self._name is not None:
raise RuntimeError(f"Want to set `name` to {value!r}, but `name` already set to {self._name!r}")
self._name = value.replace("-", "_")
@property
def key(self) -> str:
return "--" + self.name.replace("_", "-")
def render(self) -> click.option:
return click.option(self.key, **self.attributes)
def build_entrypoint(main: Callable[[Config], Any],
*options_stack: Union[dict, List[Union[Option, click.option]]],
**context_settings) -> Callable[..., Any]:
options = []
for item in options_stack:
if isinstance(item, dict):
item = build_options_from_dict(item)
for option in item:
if isinstance(option, Option):
option = option.render()
options.append(option)
decorators = [
click.command(context_settings=context_settings),
click.argument("configuration-file", default=None, required=False,
type=click.Path(exists=True, dir_okay=False, readable=True, resolve_path=True))
]
decorators.extend(options)
def entrypoint(**cli_options):
file_options = {}
configuration_file = cli_options["configuration_file"]
if configuration_file:
file_args = _build_file_args(Path(configuration_file))
collector = _decorate(decorators, lambda **options: options)
file_options = collector.main(args=file_args, standalone_mode=False, **context_settings)
file_options["configuration_file"] = configuration_file
config = Config(**ChainMap(file_options, cli_options))
return main(config)
decorated_entrypoint = _decorate(decorators, entrypoint)
return decorated_entrypoint
def get_options_defaults(*options_stack: Union[dict, List[Option]]) -> Config:
config = Config()
for item in options_stack:
if isinstance(item, dict):
item = build_options_from_dict(item)
for option in item:
if not isinstance(option, Option):
raise TypeError(f"Expect `Option`, got {option!r} of type {type(option)!r}")
if "default" in option.attributes:
if option.name in config:
raise RuntimeError(f"Key {option.name!r} already exist")
config[option.name] = option.attributes["default"]
return config
def build_options_from_dict(options: dict) -> List[Option]:
return list(_gen_dict_options(options))
def _gen_dict_options(options: dict, *, name_parts=()) -> Iterator[Option]:
for part, value in options.items():
if isinstance(value, Option):
name = "_".join(name_parts + (part,))
value.name = name
yield value
elif isinstance(value, dict):
yield from _gen_dict_options(value, name_parts=name_parts + (part,))
else:
raise TypeError(f"Expect `dict` or `Option`, got {value!r} of type {type(value)!r}")
def _decorate(decorators, f):
return reduce(lambda f, d: d(f), decorators, f)
def _build_file_args(configuration_file: Path) -> List[str]:
file_options = []
raw = yaml.safe_load(configuration_file.read_text())
viewed = set()
for k, v in _gen_flat(raw):
if k in viewed:
raise RuntimeError(f"Key {k!r} already exist")
viewed.add(k)
if not isinstance(v, list):
v = [v]
for lv in v:
file_options.extend([f"--{k}", lv])
return file_options
def _gen_flat(d: dict, *, parts=()) -> Iterator[Tuple[str, Option]]:
for k, v in d.items():
current_parts = parts + (k,)
if isinstance(v, dict):
yield from _gen_flat(v, parts=current_parts)
else:
key = "-".join(current_parts)
yield key.replace("_", "-"), v