-
Notifications
You must be signed in to change notification settings - Fork 0
/
aquery_header_graph.py
executable file
·276 lines (231 loc) · 10.1 KB
/
aquery_header_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python3
"""
This script takes a bazel action graph and produces a graph of build targets based on their
header inputs. The output is a list of targets, annotated with the header inputs used by each
target, and which targets the header originate from.
Usage: ./aquery_header_graph.py <aquery_input_file> [<target_prefix>]
<aquery_input_file> A file containing the output from a bazel aquery with
`--output=jsonproto`
<target_prefix> The script will only output targets which match this prefix.
Optional, defaults,to "//".
"""
import argparse
import functools
import sys
import json
from typing import Callable, Dict, Iterable, NoReturn, Set, Union, List
from functools import lru_cache
from pathlib import Path
class Aquery:
path_fragments: dict
artifacts: dict
depsets: dict
targets: dict
actions: dict
def __init__(self, file):
with open(file, "r") as f:
print(f"Parsing aquery from {file}", file=sys.stderr)
raw_aquery = json.load(f)
self.path_fragments = {item["id"]: item for item in raw_aquery["pathFragments"]}
self.artifacts = {item["id"]: item for item in raw_aquery["artifacts"]}
self.depsets = {item["id"]: item for item in raw_aquery["depSetOfFiles"]}
self.targets = {item["id"]: item for item in raw_aquery["targets"]}
self.actions = [item for item in raw_aquery["actions"]]
@lru_cache(maxsize=None)
def _flatten_depset(self, depset_id: int) -> Set[int]:
"""
Flatten a depset into a set of artifact ids including all direct and transitive dependencies.
"""
depset = self.depsets[depset_id]
artifact_ids = set(depset.get("directArtifactIds", set()))
transitive_depsets = depset.get("transitiveDepSetIds", [])
if not transitive_depsets:
return artifact_ids
else:
for id in transitive_depsets:
artifact_ids.update(self._flatten_depset(id))
return artifact_ids
def get_all_header_deps(self, action: dict) -> Set[int]:
"""
Determine all .h inputs into an action, returning a set of artifact ids
"""
all_deps = self.get_all_inputs(action)
header_deps = filter(
lambda artifact_id: self.get_filename(artifact_id).endswith(".h"),
all_deps,
)
return set(header_deps)
def get_direct_inputs(self, action: dict) -> Set[int]:
inputs = set()
for id in action.get("inputDepSetIds", set()):
depset = self.depsets[id]
direct_inputs = depset["directArtifactIds"]
inputs.update(direct_inputs)
return inputs
def get_transitive_inputs(self, action: dict) -> Set[int]:
transitive_depsets = set()
for id in action.get("inputDepSetIds", set()):
depset = self.depsets[id]
transitive_depsets.update(depset.get("transitiveDepSetIds", []))
transitive_inputs = set()
for id in transitive_depsets:
transitive_inputs.update(self._flatten_depset(id))
return transitive_inputs
def get_all_inputs(self, action: dict) -> Set[int]:
"""
Determine the set of all direct and transitive inputs for an action
"""
all_deps = set()
for id in action.get("inputDepSetIds", set()):
all_deps.update(self._flatten_depset(id))
return all_deps
def get_filename(self, artifact_id: int) -> str:
artifact_path_fragment_id = self.artifacts[artifact_id]["pathFragmentId"]
return self.path_fragments[artifact_path_fragment_id]["label"]
def get_full_filename(self, artifact_id: int) -> str:
artifact = self.artifacts[artifact_id]
fragment_id = artifact["pathFragmentId"]
fragment = self.path_fragments[fragment_id]
name_fragments = [fragment["label"]]
parent_id = fragment.get("parentId", None)
while parent_id:
parent = self.path_fragments[parent_id]
name_fragments = [parent["label"], *name_fragments]
parent_id = parent.get("parentId", None)
def get_filenames(self, artifact_ids: Iterable[int]) -> List[str]:
return [self.get_filename(id) for id in artifact_ids]
def get_target_label(self, targetId: int) -> str:
if targetId is not None:
return self.targets[targetId]["label"]
else:
return str(targetId)
def get_action_outputs(self, action: Dict) -> Set[int]:
return set(action["outputIds"])
@functools.lru_cache(maxsize=None)
def get_artifact_source(self, artifact_id: int) -> Union[str, NoReturn]:
"""
Determine's which bazel target outputs a given artifact.
"""
for action in self.actions:
if artifact_id in self.get_action_outputs(
action
) and self.get_action_target(action):
target = self.get_action_target(action)
return target
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("input_file", type=Path, nargs=1)
parser.add_argument(
"--target_prefix", type=str, required=False, default="//", nargs=1
)
parser.add_argument(
"--anonymous",
required=False,
default=False,
action="store_true",
)
args = parser.parse_args()
target_prefix: str = args.target_prefix[0]
input_file: Path = args.input_file[0]
aquery = Aquery(input_file)
# Group together actions by the bazel targets they are part of. There's some
# stuff going on with "middleman" actions which makes considering them as individual
# actions awkward.
targets: Dict[int, Dict] = dict()
for action in aquery.actions:
target_label = action["targetId"]
all_inputs = aquery.get_all_inputs(action)
action_outputs = aquery.get_action_outputs(action)
direct_inputs = aquery.get_direct_inputs(action)
transitive_inputs = aquery.get_transitive_inputs(action)
target = targets.get(target_label, None)
if target:
target["all_inputs"].update(all_inputs)
target["direct_inputs"].update(direct_inputs)
target["transitive_inputs"].update(transitive_inputs)
target["outputs"].update(action_outputs)
target["action_counter"] += 1
else:
targets[target_label] = {
"all_inputs": all_inputs,
"direct_inputs": direct_inputs,
"transitive_inputs": transitive_inputs,
"outputs": action_outputs,
"action_counter": 1,
}
# Now munge the data into a sensible output format
processed_targets: List[Dict] = []
for key, target in targets.items():
all_inputs = list(target["all_inputs"])
only_headers: Callable[[List[int]], List[int]] = lambda ids: [
id for id in ids if aquery.get_filename(id).endswith(".h")
]
header_inputs = only_headers(all_inputs)
direct_inputs = list(target["direct_inputs"])
direct_header_inputs = only_headers(direct_inputs)
transitive_inputs = list(target["transitive_inputs"])
transitive_header_inputs = only_headers(transitive_inputs)
outputs = list(target["outputs"])
header_outputs = only_headers(outputs)
# Now replace target entry with desired_data
processed_targets.append(
{
"targetId": key,
"header_output_ids": header_outputs,
"header_input_ids": header_inputs,
"transitive_header_inputs": transitive_header_inputs,
"direct_header_inputs": direct_header_inputs,
"action_count": target["action_counter"],
}
)
# Now loop through for the header provenencing
output = []
for target in processed_targets:
def provenence_header_file(
targets: List[Dict], header_id: int
) -> Union[int, None]:
for target in targets:
if header_id in target["direct_header_inputs"]:
header_source = target["targetId"]
return header_source
return None
header_ids = target["transitive_header_inputs"]
header_providing_dependencies = {
provenence_header_file(processed_targets, id)
for id in target["transitive_header_inputs"]
}
header_providing_dependencies.discard(None)
target["header_providing_deps"] = list(header_providing_dependencies)
def should_output_target(target) -> bool:
involves_headers = target.get("header_input_ids") or target.get(
"header_output_ids"
)
matches_target_filter = args.anonymous or aquery.get_target_label(
target["targetId"]
).startswith(target_prefix)
return involves_headers and matches_target_filter
if should_output_target(target):
target_label = target["targetId"]
header_inputs = target["header_input_ids"]
header_inputs_by_source_target = dict()
for id in header_inputs:
source_target = provenence_header_file(processed_targets, id)
exisiting_headers = header_inputs_by_source_target.get(
source_target, []
)
header_inputs_by_source_target[source_target] = exisiting_headers + [id]
# If we're not using anonymous data, then do a last minute evaluation of names
if not args.anonymous:
target_label = aquery.get_target_label(target_label)
header_inputs = aquery.get_filenames(header_inputs)
header_inputs_by_source_target = {
aquery.get_target_label(target): aquery.get_filenames(headers)
for target, headers in header_inputs_by_source_target.items()
}
output.append(
dict(
label=target_label,
header_inputs=header_inputs_by_source_target,
)
)
json.dump(output, sys.stdout, indent=2)